From 76475d210c81d9dc394493559bceddd7f560be30 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 22 Nov 2022 15:00:45 +0800 Subject: [PATCH 1/3] [SPARK-38005][core] Support cleaning up merged shuffle files and state from external shuffle service --- .../network/util/PushBasedShuffleUtils.java | 33 +++++ .../network/shuffle/ExternalBlockHandler.java | 4 + .../shuffle/ExternalBlockStoreClient.java | 33 ++++- .../shuffle/MergedShuffleFileManager.java | 8 ++ .../shuffle/NoOpMergedShuffleFileManager.java | 6 + .../shuffle/RemoteBlockPushResolver.java | 133 ++++++++++++++---- .../protocol/BlockTransferMessage.java | 3 +- .../shuffle/protocol/RemoveShuffleMerge.java | 99 +++++++++++++ .../shuffle/RemoteBlockPushResolverSuite.java | 8 +- .../apache/spark/scheduler/DAGScheduler.scala | 15 +- .../storage/BlockManagerMasterEndpoint.scala | 69 +++++---- 11 files changed, 337 insertions(+), 74 deletions(-) create mode 100644 common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java create mode 100644 common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java new file mode 100644 index 000000000000..235e1150618c --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +public class PushBasedShuffleUtils { + + /** + * Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple application attempts. + * This variable is derived from the String typed appAttemptId. If no appAttemptId is set, the default + * comparableAppAttemptId is -1. + */ + public static final int DEFAUT_APP_ATTEMPT_ID = -1; + + /** + * The flag for deleting the current merged shuffle data. + */ + public static final int DELETE_CURRENT_MERGED_SHUFFLE_ID = -1; +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 4e40090b065e..5da7b026d10e 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -198,6 +198,10 @@ protected void handleMessage( int numRemovedBlocks = blockManager.removeBlocks(msg.appId, msg.execId, msg.blockIds); callback.onSuccess(new BlocksRemoved(numRemovedBlocks).toByteBuffer()); + } else if (msgObj instanceof RemoveShuffleMerge) { + RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj; + checkAuth(client, msg.appId); + mergeManager.removeShuffleMerge(msg); } else if (msgObj instanceof GetLocalDirsForExecutors) { GetLocalDirsForExecutors msg = (GetLocalDirsForExecutors) msgObj; checkAuth(client, msg.appId); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index b066d99e8ef8..80374f756224 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -28,6 +28,8 @@ import com.codahale.metrics.MetricSet; import com.google.common.collect.Lists; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DEFAUT_APP_ATTEMPT_ID; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DELETE_CURRENT_MERGED_SHUFFLE_ID; import org.apache.spark.network.TransportContext; import org.apache.spark.network.buffer.ManagedBuffer; @@ -38,7 +40,14 @@ import org.apache.spark.network.crypto.AuthClientBootstrap; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.server.NoOpRpcHandler; -import org.apache.spark.network.shuffle.protocol.*; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; +import org.apache.spark.network.shuffle.protocol.BlocksRemoved; +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; +import org.apache.spark.network.shuffle.protocol.MergeStatuses; +import org.apache.spark.network.shuffle.protocol.RegisterExecutor; +import org.apache.spark.network.shuffle.protocol.RemoveBlocks; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.TransportConf; /** @@ -52,10 +61,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; - // Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple - // application attempts. This variable is derived from the String typed appAttemptId. If no - // appAttemptId is set, the default comparableAppAttemptId is -1. - private int comparableAppAttemptId = -1; + + private int comparableAppAttemptId = DEFAUT_APP_ATTEMPT_ID; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -315,6 +322,22 @@ public void onFailure(Throwable e) { return numRemovedBlocksFuture; } + public Boolean removeShuffleMerge(String host, int port, int shuffleId) { + try { + checkInit(); + TransportClient client = clientFactory.createClient(host, port); + String appAttemptId = getAppAttemptId() == null ? String.valueOf(DEFAUT_APP_ATTEMPT_ID) : getAppAttemptId(); + ByteBuffer removeMessage = new RemoveShuffleMerge(appId, appAttemptId, shuffleId, + DELETE_CURRENT_MERGED_SHUFFLE_ID).toByteBuffer(); + client.send(removeMessage); + } catch (Throwable e) { + logger.error("Error while sending RemoveShuffleMerge request to {}:{}", + host, port, e); + return false; + } + return true; + } + @Override public void close() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index 051684a92d0b..d50921d3abd0 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; /** * The MergedShuffleFileManager is used to process push based shuffle when enabled. It works @@ -121,6 +122,13 @@ MergedBlockMeta getMergedBlockMeta( */ String[] getMergedBlockDirs(String appId); + /** + * Handles the request to remove shuffle merge files. + * + * @param msg contains shuffle details (appId, shuffleId, etc) to uniquely identify a shuffle to be removed + */ + void removeShuffleMerge(RemoveShuffleMerge msg); + /** * Optionally close any resources associated the MergedShuffleFileManager, such as the * leveldb for state persistence. diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java index 876b10095938..da122c30755f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -26,6 +26,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.TransportConf; /** @@ -80,6 +81,11 @@ public MergedBlockMeta getMergedBlockMeta( throw new UnsupportedOperationException("Cannot handle shuffle block merge"); } + @Override + public void removeShuffleMerge(RemoveShuffleMerge msg) { + throw new UnsupportedOperationException("Cannot handle merged shuffle remove"); + } + @Override public String[] getMergedBlockDirs(String appId) { throw new UnsupportedOperationException("Cannot handle shuffle block merge"); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 816d1082850c..db3de0902296 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -33,15 +33,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; - import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonProperty; @@ -56,10 +55,11 @@ import com.google.common.cache.Weigher; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; - import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DEFAUT_APP_ATTEMPT_ID; +import static org.apache.spark.network.util.PushBasedShuffleUtils.DELETE_CURRENT_MERGED_SHUFFLE_ID; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; @@ -71,6 +71,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.shuffledb.DB; import org.apache.spark.network.shuffledb.DBBackend; import org.apache.spark.network.shuffledb.DBIterator; @@ -94,7 +95,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { public static final String SHUFFLE_META_DELIMITER = ":"; public static final String MERGE_DIR_KEY = "mergeDir"; public static final String ATTEMPT_ID_KEY = "attemptId"; - private static final int UNDEFINED_ATTEMPT_ID = -1; private static final String DB_KEY_DELIMITER = ";"; private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler(); // ByteBuffer to respond to client upon a successful merge of a pushed block @@ -234,8 +234,7 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( "shuffleMergeId is higher than latest shuffleMergeId {}", appAttemptShuffleMergeId, latestShuffleMergeId); submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + deleteCurrentOutDatedShufflePartitions(appShuffleInfo, shuffleId, appAttemptShuffleMergeId)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -365,7 +364,7 @@ public String[] getMergedBlockDirs(String appId) { } private void removeOldApplicationAttemptsFromDb(AppShuffleInfo info) { - if (info.attemptId != UNDEFINED_ATTEMPT_ID) { + if (info.attemptId != DEFAUT_APP_ATTEMPT_ID) { for (int formerAttemptId = 0; formerAttemptId < info.attemptId; formerAttemptId++) { removeAppAttemptPathInfoFromDB(info.appId, formerAttemptId); } @@ -418,6 +417,48 @@ void closeAndDeletePartitionsIfNeeded( removeAppShuffleInfoFromDB(appShuffleInfo); } + @Override + public void removeShuffleMerge(RemoveShuffleMerge msg) { + if (!appsShuffleInfo.containsKey(msg.appId)) { + logger.warn("Asked to remove merged shuffle, but application {} " + + "is not registered or NM was restarted.", msg.appId); + return; + } + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(msg.appId); + if (!msg.appAttemptId.equals(String.valueOf(appShuffleInfo.attemptId))) { + throw new IllegalArgumentException( + String.format("The attempt id %s in this removeMergedData message does not match " + + "with the current attempt id %s stored in shuffle service for application %s", + msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); + } + if (!appShuffleInfo.shuffles.containsKey(msg.shuffleId)) { + logger.warn("Asked to remove Application {} unknown shuffle merged data, shuffleId = {}", + msg.appId, msg.shuffleId); + return; + } + logger.info("Removing merged shuffle for application = {}, shuffleId = {}, shuffleMergeId = {}", msg.appId, + msg.shuffleId, msg.shuffleMergeId); + AppShuffleMergePartitionsInfo mergePartitionsInfo = appShuffleInfo.shuffles.get(msg.shuffleId); + boolean deleteCurrent = msg.shuffleMergeId.equals(DELETE_CURRENT_MERGED_SHUFFLE_ID); + if (!deleteCurrent) { + if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + throw new RuntimeException( + String.format("Asked to remove old shuffle merged data for application = %s , shuffleId = %s, " + + "shuffleMergeId = %s, but current shuffleMergeId = %s ", + msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + } else { + // There is a higher shuffleMergeId request to clean, we also clean up older shuffleMergeId partitions. + AppAttemptShuffleMergeId toRemoveHigherShuffleMergeId = new AppAttemptShuffleMergeId( + appShuffleInfo.appId, appShuffleInfo.attemptId, msg.shuffleId, msg.shuffleMergeId); + submitCleanupTask( + () -> deleteCurrentOutDatedShufflePartitions(appShuffleInfo, msg.shuffleId, toRemoveHigherShuffleMergeId)); + } + } else { + submitCleanupTask( + () -> deleteCurrentOutDatedShufflePartitions(appShuffleInfo, msg.shuffleId)); + } + } + /** * Remove the application attempt local paths information from the DB. This method is being * invoked within the lock from the ConcurrentHashmap appsShuffleInfo on the specific @@ -451,23 +492,51 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { } } + void deleteCurrentOutDatedShufflePartitions(AppShuffleInfo shuffleInfo, Integer shuffleId){ + deleteCurrentOutDatedShufflePartitions(shuffleInfo, shuffleId, null); + } + /** - * Clean up all the AppShufflePartitionInfo and the finalized shuffle partitions in DB for - * a specific shuffleMergeId. This is done since there is a higher shuffleMergeId request made - * for a shuffleId, therefore clean up older shuffleMergeId partitions. The cleanup will be - * executed the mergedShuffleCleaner thread. + * Clean up the finalized or outdated shuffle partitions. The cleanup will be executed the mergedShuffleCleaner thread. + * Two cases need to clean up the shuffle: + * 1. there is a higher shuffleMergeId request made for a shuffleId, therefore clean up older + * shuffleMergeId partitions. + * 2. Application requires to clean up the expired or unused specific shuffleId partitions */ - @VisibleForTesting - void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId appAttemptShuffleMergeId, - Map partitions) { - removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId); - partitions - .forEach((partitionId, partitionInfo) -> { - synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + void deleteCurrentOutDatedShufflePartitions( + AppShuffleInfo shuffleInfo, Integer shuffleId, + AppAttemptShuffleMergeId higherShuffleMergeIdToClean) { + AppShuffleMergePartitionsInfo mergePartitionsInfo = shuffleInfo.shuffles.get(shuffleId); + AppAttemptShuffleMergeId currentShuffleMergeId = new AppAttemptShuffleMergeId( + shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, mergePartitionsInfo.shuffleMergeId); + removeAppShufflePartitionInfoFromDB(currentShuffleMergeId); + if (higherShuffleMergeIdToClean != null) { + removeAppShufflePartitionInfoFromDB(higherShuffleMergeIdToClean); + } + if (!mergePartitionsInfo.isFinalized()) { + mergePartitionsInfo.shuffleMergePartitions + .forEach((partitionId, partitionInfo) -> { + synchronized (partitionInfo) { + partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + } + }); + } else { + Arrays.stream(mergePartitionsInfo.finalizedPartitions).forEach(partition -> { + try { + File metaFile = + shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergePartitionsInfo.shuffleMergeId, partition); + File indexFile = new File( + shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergePartitionsInfo.shuffleMergeId, partition)); + File dataFile = + shuffleInfo.getMergedShuffleDataFile(shuffleId, mergePartitionsInfo.shuffleMergeId, partition); + metaFile.delete(); + indexFile.delete(); + dataFile.delete(); + } catch (Exception e) { + logger.error("Error delete shuffle files for {}", currentShuffleMergeId, e); } }); + } } /** @@ -654,8 +723,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + deleteCurrentOutDatedShufflePartitions(appShuffleInfo, shuffleId, appAttemptShuffleMergeId)); } else { // This block covers: // 1. finalization of determinate stage @@ -712,6 +780,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId, bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); + appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds)); } logger.info("{} attempt {} shuffle {} shuffleMerge {}: finalization of shuffle merge completed", msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId); @@ -736,21 +805,21 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { Map metaMap = mapper.readValue(mergeDirInfo, typeRef); String mergeDir = metaMap.get(MERGE_DIR_KEY); int attemptId = Integer.valueOf( - metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID))); + metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(DEFAUT_APP_ATTEMPT_ID))); if (mergeDir == null) { throw new IllegalArgumentException( String.format("Failed to get the merge directory information from the " + "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta)); } - if (attemptId == UNDEFINED_ATTEMPT_ID) { + if (attemptId == DEFAUT_APP_ATTEMPT_ID) { // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. // Only the first ExecutorRegister message can register the merge dirs. // DB will also get updated with the registered local path information. appsShuffleInfo.computeIfAbsent(appId, id -> { AppPathsInfo appPathsInfo = new AppPathsInfo(appId, executorInfo.localDirs, mergeDir, executorInfo.subDirsPerLocalDir); - writeAppPathsInfoToDb(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo); - return new AppShuffleInfo(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo); + writeAppPathsInfoToDb(appId, DEFAUT_APP_ATTEMPT_ID, appPathsInfo); + return new AppShuffleInfo(appId, DEFAUT_APP_ATTEMPT_ID, appPathsInfo); }); } else { // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. @@ -1464,6 +1533,7 @@ public static class AppShuffleMergePartitionsInfo { Collections.emptyMap(); private final int shuffleMergeId; private final Map shuffleMergePartitions; + private int[] finalizedPartitions = new int[0]; public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; @@ -1479,6 +1549,15 @@ public Map getShuffleMergePartitions() { public boolean isFinalized() { return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; } + + public void setFinalizedPartitions(int[] finalizedPartitions){ + assert isFinalized(); + this.finalizedPartitions = finalizedPartitions; + } + + public int[] getFinalizedPartitions() { + return finalizedPartitions; + } } /** diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index ad959c7e2e7c..33411baa09f8 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -50,7 +50,7 @@ public enum Type { FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), LOCAL_DIRS_FOR_EXECUTORS(11), PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14), FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), CORRUPTION_CAUSE(17), - PUSH_BLOCK_RETURN_CODE(18); + PUSH_BLOCK_RETURN_CODE(18), REMOVE_SHUFFLE_MERGE(19); private final byte id; @@ -88,6 +88,7 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) { case 16: return DiagnoseCorruption.decode(buf); case 17: return CorruptionCause.decode(buf); case 18: return BlockPushReturnCode.decode(buf); + case 19: return RemoveShuffleMerge.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java new file mode 100644 index 000000000000..7a21537b0140 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.protocol; + +import java.util.Objects; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import org.apache.spark.network.protocol.Encoders; + +/** + * Request to remove merged shuffle blocks. + */ +public class RemoveShuffleMerge extends BlockTransferMessage { + public final String appId; + public final String appAttemptId; + public final Integer shuffleId; + public final Integer shuffleMergeId; + + public RemoveShuffleMerge( + String appId, String appAttemptId, Integer shuffleId, Integer shuffleMergeId) { + this.appId = appId; + this.appAttemptId = appAttemptId; + this.shuffleId = shuffleId; + this.shuffleMergeId = shuffleMergeId; + } + + @Override + protected Type type() { return Type.REMOVE_SHUFFLE_MERGE; } + + @Override + public int hashCode() { + return com.google.common.base.Objects. + hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); + } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) + .append("appId", appId) + .append("appAttemptId", appAttemptId) + .append("shuffleId", shuffleId) + .append("shuffleMergeId", shuffleMergeId) + .toString(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof RemoveShuffleMerge) { + RemoveShuffleMerge o = (RemoveShuffleMerge) other; + return Objects.equals(appId, o.appId) + && Objects.equals(appAttemptId, o.appAttemptId) + && Objects.equals(shuffleId, o.shuffleId) + && Objects.equals(shuffleMergeId, o.shuffleMergeId); + } + return false; + } + + @Override + public int encodedLength() { + return Encoders.Strings.encodedLength(appId) + + 4 /* encoded length of appAttemptId */ + + 4 /* encoded length of shuffleId */ + + 4/* encoded length of shuffleMergeId */; + } + + @Override + public void encode(ByteBuf buf) { + Encoders.Strings.encode(buf, appId); + Encoders.Strings.encode(buf, appAttemptId); + buf.writeInt(shuffleId); + buf.writeInt(shuffleMergeId); + } + + public static RemoveShuffleMerge decode(ByteBuf buf) { + String appId = Encoders.Strings.decode(buf); + String appAttemptId = Encoders.Strings.decode(buf); + int shuffleId = buf.readInt(); + int shuffleMergeId = buf.readInt(); + return new RemoveShuffleMerge(appId, appAttemptId, shuffleId, shuffleMergeId); + } +} diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index eb2c1d9fa5cb..76450203535f 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,7 +28,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; -import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -1148,10 +1147,9 @@ public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedExce Semaphore closed = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf, null) { @Override - void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId appAttemptShuffleMergeId, - Map partitions) { - super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions); + void deleteCurrentOutDatedShufflePartitions(AppShuffleInfo shuffleInfo, Integer shuffleId, + AppAttemptShuffleMergeId higherShuffleMergeIdToClean) { + super.deleteCurrentOutDatedShufflePartitions(shuffleInfo, shuffleId, higherShuffleMergeIdToClean); closed.release(); } }; diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fb3512619d87..6260c436dea3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1397,7 +1397,7 @@ private[spark] class DAGScheduler( private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { assert(stage.shuffleDep.shuffleMergeAllowed && !stage.shuffleDep.isShuffleMergeFinalizedMarked) if (stage.shuffleDep.getMergerLocs.isEmpty) { - getAndSetShufflePushMergerLocations(stage) + configureShufflePushMergerLocations(stage) } val shuffleId = stage.shuffleDep.shuffleId @@ -1413,17 +1413,20 @@ private[spark] class DAGScheduler( } } - private def getAndSetShufflePushMergerLocations(stage: ShuffleMapStage): Seq[BlockManagerId] = { + private def configureShufflePushMergerLocations(stage: ShuffleMapStage): Unit = { + if (stage.shuffleDep.getMergerLocs.nonEmpty) { + return + } val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) if (mergerLocs.nonEmpty) { stage.shuffleDep.setMergerLocs(mergerLocs) } + mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, mergerLocs) logDebug(s"Shuffle merge locations for shuffle ${stage.shuffleDep.shuffleId} with" + s" shuffle merge ${stage.shuffleDep.shuffleMergeId} is" + s" ${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") - mergerLocs } /** Called when stage's parents are available and we can now do its task. */ @@ -2621,13 +2624,13 @@ private[spark] class DAGScheduler( stage.shuffleDep.shuffleMergeAllowed && stage.shuffleDep.getMergerLocs.isEmpty && runningStages.contains(stage) }.foreach { case(_, stage: ShuffleMapStage) => - if (getAndSetShufflePushMergerLocations(stage).nonEmpty) { + configureShufflePushMergerLocations(stage) + if(stage.shuffleDep.getMergerLocs.nonEmpty) { + // stage's merger locations change from empty to non-empty logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" + s" ${stage.shuffleDep.shuffleId} and shuffle merge" + s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" + s" merger locations") - mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId, - stage.shuffleDep.getMergerLocs) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index adeb507941c0..6fee25d48d2a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint( } }.toSeq - // Find all shuffle blocks on executors that are no longer running - val blocksToDeleteByShuffleService = - new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] + var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]] if (externalShuffleServiceRemoveShuffleEnabled) { - mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => - shuffleStatus.withMapStatuses { mapStatuses => - mapStatuses.foreach { mapStatus => - // Check if the executor has been deallocated - if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { - val blocksToDel = - shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId) - if (blocksToDel.nonEmpty) { - val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location, - new mutable.HashSet[BlockId]) - blocks ++= blocksToDel + val shuffleClient = externalBlockStoreClient.get + // Find all shuffle blocks on executors that are no longer running + val blocksToDelete = new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]] + mapOutputTracker.shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + shuffleStatus.withMapStatuses { mapStatuses => + mapStatuses.foreach { mapStatus => + // Check if the executor has been deallocated + if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { + val blocksToDel = shuffleManager.shuffleBlockResolver + .getBlocksForShuffle(shuffleId, mapStatus.mapId) + if (blocksToDel.nonEmpty) { + val blocks = blocksToDelete.getOrElseUpdate(mapStatus.location, + new mutable.HashSet[BlockId]) + blocks ++= blocksToDel + } } } } - } + removeShuffleFromShuffleServicesFutures ++= blocksToDelete.map { case (bmId, blockIds) => + Future[Boolean] { + val numRemovedBlocks = shuffleClient.removeBlocks( + bmId.host, + bmId.port, + bmId.executorId, + blockIds.map(_.toString).toArray) + numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, + TimeUnit.SECONDS) == blockIds.size + } + }.toSeq + case None => + logError(s"Asked to remove shuffle blocks from " + + s"shuffle service for unknown shuffle ${shuffleId}") } } - - val removeShuffleFromShuffleServicesFutures = - externalBlockStoreClient.map { shuffleClient => - blocksToDeleteByShuffleService.map { case (bmId, blockIds) => - Future[Boolean] { - val numRemovedBlocks = shuffleClient.removeBlocks( - bmId.host, - bmId.port, - bmId.executorId, - blockIds.map(_.toString).toArray) - numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, - TimeUnit.SECONDS) == blockIds.size - } + if (pushBasedShuffleEnabled && externalBlockStoreClient.isDefined) { + val shuffleClient = externalBlockStoreClient.get + val shuffleMergerLocations = mapOutputTracker.getShufflePushMergerLocations(shuffleId) + removeShuffleFromShuffleServicesFutures ++= shuffleMergerLocations.map(bmId => { + Future[Boolean] { + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) } - }.getOrElse(Seq.empty) - + }) + } Future.sequence(removeShuffleFromExecutorsFutures ++ removeShuffleFromShuffleServicesFutures) } From 1c26c9c29abf48398f6bfc8124553814968bed9b Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Tue, 22 Nov 2022 20:51:26 +0800 Subject: [PATCH 2/3] [SPARK-38005][core] Support cleaning up merged shuffle files and state from external shuffle service --- .../shuffle/RemoteBlockPushResolver.java | 136 ++++++++++-------- .../shuffle/RemoteBlockPushResolverSuite.java | 29 +++- 2 files changed, 98 insertions(+), 67 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index db3de0902296..15dcc6fd975f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -227,14 +228,13 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being // run for the shuffle ID. Close and clean up old shuffleMergeId files, // happens in the indeterminate stage retries - AppAttemptShuffleMergeId appAttemptShuffleMergeId = + AppAttemptShuffleMergeId higherShuffleMergeId = new AppAttemptShuffleMergeId( appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId); logger.info("{}: creating a new shuffle merge metadata since received " + "shuffleMergeId is higher than latest shuffleMergeId {}", - appAttemptShuffleMergeId, latestShuffleMergeId); - submitCleanupTask(() -> - deleteCurrentOutDatedShufflePartitions(appShuffleInfo, shuffleId, appAttemptShuffleMergeId)); + higherShuffleMergeId, latestShuffleMergeId); + submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, Optional.of(higherShuffleMergeId)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -440,22 +440,18 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.shuffleId, msg.shuffleMergeId); AppShuffleMergePartitionsInfo mergePartitionsInfo = appShuffleInfo.shuffles.get(msg.shuffleId); boolean deleteCurrent = msg.shuffleMergeId.equals(DELETE_CURRENT_MERGED_SHUFFLE_ID); - if (!deleteCurrent) { - if (msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { - throw new RuntimeException( - String.format("Asked to remove old shuffle merged data for application = %s , shuffleId = %s, " - + "shuffleMergeId = %s, but current shuffleMergeId = %s ", - msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); - } else { - // There is a higher shuffleMergeId request to clean, we also clean up older shuffleMergeId partitions. - AppAttemptShuffleMergeId toRemoveHigherShuffleMergeId = new AppAttemptShuffleMergeId( - appShuffleInfo.appId, appShuffleInfo.attemptId, msg.shuffleId, msg.shuffleMergeId); - submitCleanupTask( - () -> deleteCurrentOutDatedShufflePartitions(appShuffleInfo, msg.shuffleId, toRemoveHigherShuffleMergeId)); - } + if (!deleteCurrent && msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { + throw new RuntimeException( + String.format("Asked to remove old shuffle merged data for application = %s , shuffleId = %s, " + + "shuffleMergeId = %s, but current shuffleMergeId = %s ", + msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); + } else if (!deleteCurrent && msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { + // There is a higher shuffleMergeId request to clean, we also clean up older shuffleMergeId partitions. + Optional toRemoveHigherShuffleMergeId = Optional.of(new AppAttemptShuffleMergeId( + appShuffleInfo.appId, appShuffleInfo.attemptId, msg.shuffleId, msg.shuffleMergeId)); + submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, toRemoveHigherShuffleMergeId); } else { - submitCleanupTask( - () -> deleteCurrentOutDatedShufflePartitions(appShuffleInfo, msg.shuffleId)); + submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, Optional.empty()); } } @@ -492,53 +488,72 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { } } - void deleteCurrentOutDatedShufflePartitions(AppShuffleInfo shuffleInfo, Integer shuffleId){ - deleteCurrentOutDatedShufflePartitions(shuffleInfo, shuffleId, null); - } - /** - * Clean up the finalized or outdated shuffle partitions. The cleanup will be executed the mergedShuffleCleaner thread. - * Two cases need to clean up the shuffle: - * 1. there is a higher shuffleMergeId request made for a shuffleId, therefore clean up older - * shuffleMergeId partitions. - * 2. Application requires to clean up the expired or unused specific shuffleId partitions + * Clean up the outdated finalized or unfinalized shuffle partitions. The cleanup will be executed in the + * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1. there is a higher shuffleMergeId request + * made for a shuffleId, therefore clean up older shuffleMergeId partitions. 2. Application requires to clean up the + * expired or unused specific shuffleId partitions */ - void deleteCurrentOutDatedShufflePartitions( + @VisibleForTesting + void submitRemoveShuffleMergeTask( AppShuffleInfo shuffleInfo, Integer shuffleId, - AppAttemptShuffleMergeId higherShuffleMergeIdToClean) { + Optional higherShuffleMergeId) { AppShuffleMergePartitionsInfo mergePartitionsInfo = shuffleInfo.shuffles.get(shuffleId); - AppAttemptShuffleMergeId currentShuffleMergeId = new AppAttemptShuffleMergeId( + AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId( shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, mergePartitionsInfo.shuffleMergeId); - removeAppShufflePartitionInfoFromDB(currentShuffleMergeId); - if (higherShuffleMergeIdToClean != null) { - removeAppShufflePartitionInfoFromDB(higherShuffleMergeIdToClean); - } if (!mergePartitionsInfo.isFinalized()) { - mergePartitionsInfo.shuffleMergePartitions - .forEach((partitionId, partitionInfo) -> { - synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); - } - }); + Map partitionsToClean = mergePartitionsInfo.shuffleMergePartitions; + submitCleanupTask(() -> + closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, higherShuffleMergeId)); } else { - Arrays.stream(mergePartitionsInfo.finalizedPartitions).forEach(partition -> { - try { - File metaFile = - shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergePartitionsInfo.shuffleMergeId, partition); - File indexFile = new File( - shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergePartitionsInfo.shuffleMergeId, partition)); - File dataFile = - shuffleInfo.getMergedShuffleDataFile(shuffleId, mergePartitionsInfo.shuffleMergeId, partition); - metaFile.delete(); - indexFile.delete(); - dataFile.delete(); - } catch (Exception e) { - logger.error("Error delete shuffle files for {}", currentShuffleMergeId, e); - } - }); + int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions; + submitCleanupTask(() -> + deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, partitionsToClean, higherShuffleMergeId)); } } + @VisibleForTesting + void closeAndDeleteOutdatedPartitions( + AppAttemptShuffleMergeId shuffleMergeId, + Map partitions, + Optional higherShuffleMergeId) { + removeAppShufflePartitionInfoFromDB(shuffleMergeId); + higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); + partitions + .forEach((partitionId, partitionInfo) -> { + synchronized (partitionInfo) { + partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + } + }); + } + + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, + AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { + int shuffleId = shuffleMergeId.shuffleId; + int mergeId = shuffleMergeId.shuffleMergeId; + removeAppShufflePartitionInfoFromDB(shuffleMergeId); + higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); + Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> { + try { + File metaFile = + shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); + File indexFile = new File( + shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); + File dataFile = + shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); + metaFile.delete(); + indexFile.delete(); + dataFile.delete(); + } catch (Exception e) { + logger.error("Error delete shuffle files for {}", shuffleMergeId, e); + } + }); + } + /** * Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId * @param appAttemptShuffleMergeId @@ -722,8 +737,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. - submitCleanupTask(() -> - deleteCurrentOutDatedShufflePartitions(appShuffleInfo, shuffleId, appAttemptShuffleMergeId)); + submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, Optional.of(appAttemptShuffleMergeId)); } else { // This block covers: // 1. finalization of determinate stage @@ -1766,9 +1780,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) { try { if (dataChannel.isOpen()) { dataChannel.close(); - if (delete) { - dataFile.delete(); - } + } + if (delete) { + dataFile.delete(); } } catch (IOException ioe) { logger.warn("Error closing data channel for {} reduceId {}", diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 76450203535f..7bdbb446af17 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -28,12 +28,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; - import org.apache.commons.io.FileUtils; import org.junit.After; import org.junit.Assert; @@ -42,8 +44,11 @@ import org.roaringbitmap.RoaringBitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.client.StreamCallbackWithID; @@ -1147,9 +1152,21 @@ public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedExce Semaphore closed = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf, null) { @Override - void deleteCurrentOutDatedShufflePartitions(AppShuffleInfo shuffleInfo, Integer shuffleId, - AppAttemptShuffleMergeId higherShuffleMergeIdToClean) { - super.deleteCurrentOutDatedShufflePartitions(shuffleInfo, shuffleId, higherShuffleMergeIdToClean); + void closeAndDeleteOutdatedPartitions( + AppAttemptShuffleMergeId shuffleMergeId, + Map partitions, + Optional higherShuffleMergeId) { + super.closeAndDeleteOutdatedPartitions(shuffleMergeId, partitions, higherShuffleMergeId); + closed.release(); + } + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, + AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { + super.deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, + outdatedFinalizedPartitions, higherShuffleMergeId); closed.release(); } }; From ea866147b9617f9a305a1ead31c6019cbed248bf Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Thu, 24 Nov 2022 18:21:49 +0800 Subject: [PATCH 3/3] improve code style and fix comments --- .../network/util/PushBasedShuffleUtils.java | 6 +- .../shuffle/ExternalBlockStoreClient.java | 3 +- .../shuffle/MergedShuffleFileManager.java | 3 +- .../shuffle/RemoteBlockPushResolver.java | 80 ++++++++------ .../shuffle/protocol/RemoveShuffleMerge.java | 4 +- .../shuffle/RemoteBlockPushResolverSuite.java | 101 +++++++++++++----- .../storage/BlockManagerMasterEndpoint.scala | 42 +++++--- .../yarn/YarnShuffleServiceSuite.scala | 2 +- 8 files changed, 157 insertions(+), 84 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java index 235e1150618c..a7cd21c81437 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/PushBasedShuffleUtils.java @@ -20,9 +20,9 @@ public class PushBasedShuffleUtils { /** - * Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple application attempts. - * This variable is derived from the String typed appAttemptId. If no appAttemptId is set, the default - * comparableAppAttemptId is -1. + * Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple + * application attempts. This variable is derived from the String typed appAttemptId. + * If no appAttemptId is set, the default comparableAppAttemptId is -1. */ public static final int DEFAUT_APP_ATTEMPT_ID = -1; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 80374f756224..504df47de1bd 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -326,7 +326,8 @@ public Boolean removeShuffleMerge(String host, int port, int shuffleId) { try { checkInit(); TransportClient client = clientFactory.createClient(host, port); - String appAttemptId = getAppAttemptId() == null ? String.valueOf(DEFAUT_APP_ATTEMPT_ID) : getAppAttemptId(); + String appAttemptId = + getAppAttemptId() == null ? String.valueOf(DEFAUT_APP_ATTEMPT_ID) : getAppAttemptId(); ByteBuffer removeMessage = new RemoveShuffleMerge(appId, appAttemptId, shuffleId, DELETE_CURRENT_MERGED_SHUFFLE_ID).toByteBuffer(); client.send(removeMessage); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java index d50921d3abd0..e70e365cb97a 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java @@ -125,7 +125,8 @@ MergedBlockMeta getMergedBlockMeta( /** * Handles the request to remove shuffle merge files. * - * @param msg contains shuffle details (appId, shuffleId, etc) to uniquely identify a shuffle to be removed + * @param msg contains shuffle details (appId, shuffleId, etc) to uniquely identify + * a shuffle to be removed */ void removeShuffleMerge(RemoveShuffleMerge msg); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 15dcc6fd975f..8beef207b1f9 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -234,7 +234,8 @@ AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( logger.info("{}: creating a new shuffle merge metadata since received " + "shuffleMergeId is higher than latest shuffleMergeId {}", higherShuffleMergeId, latestShuffleMergeId); - submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, Optional.of(higherShuffleMergeId)); + submitRemoveShuffleMergeTask( + appShuffleInfo, shuffleId, Optional.of(higherShuffleMergeId)); return new AppShuffleMergePartitionsInfo(shuffleMergeId, false); } else { // The request is for block with same shuffleMergeId as the latest shuffleMergeId @@ -420,7 +421,7 @@ void closeAndDeletePartitionsIfNeeded( @Override public void removeShuffleMerge(RemoveShuffleMerge msg) { if (!appsShuffleInfo.containsKey(msg.appId)) { - logger.warn("Asked to remove merged shuffle, but application {} " + + logger.debug("Asked to remove merged shuffle, but application {} " + "is not registered or NM was restarted.", msg.appId); return; } @@ -432,23 +433,25 @@ public void removeShuffleMerge(RemoveShuffleMerge msg) { msg.appAttemptId, appShuffleInfo.attemptId, msg.appId)); } if (!appShuffleInfo.shuffles.containsKey(msg.shuffleId)) { - logger.warn("Asked to remove Application {} unknown shuffle merged data, shuffleId = {}", + logger.debug("Asked to remove Application {} unknown shuffle merged data, shuffleId = {}", msg.appId, msg.shuffleId); return; } - logger.info("Removing merged shuffle for application = {}, shuffleId = {}, shuffleMergeId = {}", msg.appId, - msg.shuffleId, msg.shuffleMergeId); + logger.info("Removing merged shuffle for application = {}, shuffleId = {}, shuffleMergeId = {}", + msg.appId, msg.shuffleId, msg.shuffleMergeId); AppShuffleMergePartitionsInfo mergePartitionsInfo = appShuffleInfo.shuffles.get(msg.shuffleId); boolean deleteCurrent = msg.shuffleMergeId.equals(DELETE_CURRENT_MERGED_SHUFFLE_ID); if (!deleteCurrent && msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) { throw new RuntimeException( - String.format("Asked to remove old shuffle merged data for application = %s , shuffleId = %s, " - + "shuffleMergeId = %s, but current shuffleMergeId = %s ", + String.format("Asked to remove old shuffle merged data for application = %s ," + + " shuffleId = %s, shuffleMergeId = %s, but current shuffleMergeId = %s ", msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId)); } else if (!deleteCurrent && msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { - // There is a higher shuffleMergeId request to clean, we also clean up older shuffleMergeId partitions. - Optional toRemoveHigherShuffleMergeId = Optional.of(new AppAttemptShuffleMergeId( - appShuffleInfo.appId, appShuffleInfo.attemptId, msg.shuffleId, msg.shuffleMergeId)); + // There is a higher shuffleMergeId request to clean, but also clean up older + // shuffleMergeId partitions. + Optional toRemoveHigherShuffleMergeId = + Optional.of(new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, + msg.shuffleId, msg.shuffleMergeId)); submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, toRemoveHigherShuffleMergeId); } else { submitRemoveShuffleMergeTask(appShuffleInfo, msg.shuffleId, Optional.empty()); @@ -489,26 +492,30 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { } /** - * Clean up the outdated finalized or unfinalized shuffle partitions. The cleanup will be executed in the - * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1. there is a higher shuffleMergeId request - * made for a shuffleId, therefore clean up older shuffleMergeId partitions. 2. Application requires to clean up the - * expired or unused specific shuffleId partitions + * Clean up the outdated finalized or unfinalized shuffle partitions. + * The cleanup will be executed in the mergedShuffleCleaner thread. Two cases: + * 1. there is a higher shuffleMergeId request made for a shuffleId, therefore clean up + * older shuffleMergeId partitions. + * 2. Application requires to clean up the expired or unused specific shuffleId partitions */ @VisibleForTesting void submitRemoveShuffleMergeTask( AppShuffleInfo shuffleInfo, Integer shuffleId, - Optional higherShuffleMergeId) { + Optional higherShuffleMergeIdToClean) { AppShuffleMergePartitionsInfo mergePartitionsInfo = shuffleInfo.shuffles.get(shuffleId); AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId( shuffleInfo.appId, shuffleInfo.attemptId, shuffleId, mergePartitionsInfo.shuffleMergeId); + // Before submitting an asynchronous task, partitions should be extracted first if (!mergePartitionsInfo.isFinalized()) { - Map partitionsToClean = mergePartitionsInfo.shuffleMergePartitions; + Map partitionsToClean = mergePartitionsInfo.shuffleMergePartitions; submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, higherShuffleMergeId)); + closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean, + higherShuffleMergeIdToClean)); } else { - int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions; + int[] partitionsToClean = mergePartitionsInfo.finalizedPartitionsForClean; submitCleanupTask(() -> - deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, partitionsToClean, higherShuffleMergeId)); + deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, + partitionsToClean, higherShuffleMergeIdToClean)); } } @@ -539,17 +546,23 @@ void deleteOutdatedFinalizedPartitions( higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB); Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> { try { - File metaFile = - shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); - File indexFile = new File( - shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); - File dataFile = - shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); + File metaFile = shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId, partition); metaFile.delete(); + } catch (Exception e) { + logger.warn("Error deleting meta file for {}", shuffleMergeId, e); + } + try { + File indexFile = + new File(shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId, partition)); indexFile.delete(); + } catch (Exception e) { + logger.warn("Error deleting index file for {}", shuffleMergeId, e); + } + try { + File dataFile = shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId, partition); dataFile.delete(); } catch (Exception e) { - logger.error("Error delete shuffle files for {}", shuffleMergeId, e); + logger.warn("Error deleting dataFile file for {}", shuffleMergeId, e); } }); } @@ -737,7 +750,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. - submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, Optional.of(appAttemptShuffleMergeId)); + submitRemoveShuffleMergeTask(appShuffleInfo, shuffleId, + Optional.of(appAttemptShuffleMergeId)); } else { // This block covers: // 1. finalization of determinate stage @@ -1547,12 +1561,14 @@ public static class AppShuffleMergePartitionsInfo { Collections.emptyMap(); private final int shuffleMergeId; private final Map shuffleMergePartitions; - private int[] finalizedPartitions = new int[0]; + // Mark the finalized partitions need to be cleaned when removing merged shuffle later + private int[] finalizedPartitionsForClean; public AppShuffleMergePartitionsInfo(int shuffleMergeId, boolean shuffleFinalized) { this.shuffleMergeId = shuffleMergeId; this.shuffleMergePartitions = shuffleFinalized ? SHUFFLE_FINALIZED_MARKER : new ConcurrentHashMap<>(); + this.finalizedPartitionsForClean = new int[0]; } @VisibleForTesting @@ -1564,13 +1580,9 @@ public boolean isFinalized() { return shuffleMergePartitions == SHUFFLE_FINALIZED_MARKER; } - public void setFinalizedPartitions(int[] finalizedPartitions){ + public void setFinalizedPartitions(int[] finalizedPartitionsForClean){ assert isFinalized(); - this.finalizedPartitions = finalizedPartitions; - } - - public int[] getFinalizedPartitions() { - return finalizedPartitions; + this.finalizedPartitionsForClean = finalizedPartitionsForClean; } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java index 7a21537b0140..798457655119 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RemoveShuffleMerge.java @@ -47,8 +47,8 @@ public RemoveShuffleMerge( @Override public int hashCode() { - return com.google.common.base.Objects. - hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); + return com.google.common.base.Objects + .hashCode(appId, appAttemptId, shuffleId, shuffleMergeId); } @Override diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 7bdbb446af17..2532c3f6c4c3 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -60,6 +61,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; import org.apache.spark.network.shuffle.protocol.MergeStatuses; import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge; import org.apache.spark.network.util.MapConfigProvider; import org.apache.spark.network.util.TransportConf; @@ -1150,28 +1152,8 @@ public void testBlockFetchWithOlderShuffleMergeId() throws IOException { @Test public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); - pushResolver = new RemoteBlockPushResolver(conf, null) { - @Override - void closeAndDeleteOutdatedPartitions( - AppAttemptShuffleMergeId shuffleMergeId, - Map partitions, - Optional higherShuffleMergeId) { - super.closeAndDeleteOutdatedPartitions(shuffleMergeId, partitions, higherShuffleMergeId); - closed.release(); - } - @VisibleForTesting - void deleteOutdatedFinalizedPartitions( - AppShuffleInfo shuffleInfo, - AppAttemptShuffleMergeId shuffleMergeId, - int[] outdatedFinalizedPartitions, - Optional higherShuffleMergeId) { - super.deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, - outdatedFinalizedPartitions, higherShuffleMergeId); - closed.release(); - } - }; String testApp = "testCleanupOlderShuffleMergeId"; - registerExecutor(testApp, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); + prepareForRemoveShuffle(closed, testApp); StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); @@ -1181,7 +1163,7 @@ void deleteOutdatedFinalizedPartitions( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 2, 0, 0, 0)); RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = pushResolver.validateAndGetAppShuffleInfo(testApp); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); assertFalse("Data files on the disk should be cleaned up", appShuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); assertFalse("Meta files on the disk should be cleaned up", @@ -1200,7 +1182,7 @@ void deleteOutdatedFinalizedPartitions( StreamCallbackWithID stream3 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 3, 0, 0, 0)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2])); stream3.onComplete(stream3.getID()); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 3)); @@ -1210,7 +1192,7 @@ void deleteOutdatedFinalizedPartitions( StreamCallbackWithID stream4 = pushResolver.receiveBlockDataAsStream( new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 4, 0, 0, 0)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); // Do not finalize shuffleMergeId 4 can happen during stage cancellation. stream4.onData(stream4.getID(), ByteBuffer.wrap(new byte[2])); stream4.onComplete(stream4.getID()); @@ -1218,7 +1200,7 @@ void deleteOutdatedFinalizedPartitions( // Check whether the data is cleaned up when higher shuffleMergeId finalize request comes // but no blocks pushed for that shuffleMergeId pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 5)); - closed.acquire(); + closed.tryAcquire(10, TimeUnit.SECONDS); assertFalse("MergedBlock meta file for shuffle 0 and shuffleMergeId 4 should be cleaned" + " up", appShuffleInfo.getMergedShuffleMetaFile(0, 4, 0).exists()); assertFalse("MergedBlock index file for shuffle 0 and shuffleMergeId 4 should be cleaned" @@ -1227,6 +1209,43 @@ void deleteOutdatedFinalizedPartitions( + " up", appShuffleInfo.getMergedShuffleDataFile(0, 4, 0).exists()); } + @Test + public void testRemoveShuffleMerge() throws IOException, InterruptedException { + Semaphore closed = new Semaphore(0); + String testApp = "testRemoveShuffleMerge"; + prepareForRemoveShuffle(closed, testApp); + + // 1. Check whether the data is cleaned up when merged shuffle is finalized + RemoteBlockPushResolver.AppShuffleInfo shuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(testApp); + createShuffleData(testApp, 0, 1, 0, 0); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1)); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, String.valueOf(NO_ATTEMPT_ID), 0, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists()); + assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 0)).exists()); + assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists()); + + // 2. Check whether the data is cleaned up when merged shuffle is not finalized. + createShuffleData(testApp, 2, 1, 0, 0); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + pushResolver.removeShuffleMerge( + new RemoveShuffleMerge(testApp, String.valueOf(NO_ATTEMPT_ID), 2, 1)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists()); + + // 3. Check whether the data is cleaned up when higher shuffleMergeId finalize request comes + createShuffleData(testApp, 3, 1, 0, 0); + assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 3, 2)); + closed.tryAcquire(10, TimeUnit.SECONDS); + assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists()); + } + @Test public void testFinalizationResultIsEmptyWhenTheServerDidNotReceiveAnyBlocks() { //shuffle 1 0 is finalized even though the server didn't receive any blocks for it. @@ -1356,6 +1375,38 @@ AppShufflePartitionInfo newAppShufflePartitionInfo( registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); } + private void prepareForRemoveShuffle(Semaphore closed, String appId) throws IOException { + pushResolver = new RemoteBlockPushResolver(conf, null) { + @Override + void closeAndDeleteOutdatedPartitions( + AppAttemptShuffleMergeId shuffleMergeId, Map partitions, + Optional higherShuffleMergeId) { + super.closeAndDeleteOutdatedPartitions(shuffleMergeId, partitions, higherShuffleMergeId); + closed.release(); + } + @VisibleForTesting + void deleteOutdatedFinalizedPartitions( + AppShuffleInfo shuffleInfo, AppAttemptShuffleMergeId shuffleMergeId, + int[] outdatedFinalizedPartitions, + Optional higherShuffleMergeId) { + super.deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId, + outdatedFinalizedPartitions, higherShuffleMergeId); + closed.release(); + } + }; + registerExecutor(appId, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META); + } + + private StreamCallbackWithID createShuffleData(String appId, int shuffleId, + int shuffleMergedId, int mapIndex, int reduceId) throws IOException { + StreamCallbackWithID streamCallback = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(appId, NO_ATTEMPT_ID, shuffleId, + shuffleMergedId, mapIndex, reduceId, 0)); + streamCallback.onData(streamCallback.getID(), ByteBuffer.wrap(new byte[2])); + streamCallback.onComplete(streamCallback.getID()); + return streamCallback; + } + private Path[] createLocalDirs(int numLocalDirs) throws IOException { Path[] localDirs = new Path[numLocalDirs]; for (int i = 0; i < localDirs.length; i++) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 6fee25d48d2a..55f019773910 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -319,15 +319,23 @@ class BlockManagerMasterEndpoint( } private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = { - val removeMsg = RemoveShuffle(shuffleId) - val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => - bm.storageEndpoint.ask[Boolean](removeMsg).recover { - // use false as default value means no shuffle data were removed - handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]] + if (pushBasedShuffleEnabled && externalBlockStoreClient.isDefined) { + val shuffleClient = externalBlockStoreClient.get + mapOutputTracker.shuffleStatuses.get(shuffleId) match { + case Some(shuffleStatus) => + val shuffleMergerLocations = shuffleStatus.getShufflePushMergerLocations + removeShuffleFromShuffleServicesFutures ++= shuffleMergerLocations.map(bmId => { + Future[Boolean] { + shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) + } + }) + case None => + logWarning(s"Asked to remove merge shuffle blocks from " + + s"shuffle service for unknown shuffle ${shuffleId}") } - }.toSeq + } - var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]] if (externalShuffleServiceRemoveShuffleEnabled) { val shuffleClient = externalBlockStoreClient.get // Find all shuffle blocks on executors that are no longer running @@ -360,19 +368,19 @@ class BlockManagerMasterEndpoint( } }.toSeq case None => - logError(s"Asked to remove shuffle blocks from " + + logDebug(s"Asked to remove shuffle blocks from " + s"shuffle service for unknown shuffle ${shuffleId}") } } - if (pushBasedShuffleEnabled && externalBlockStoreClient.isDefined) { - val shuffleClient = externalBlockStoreClient.get - val shuffleMergerLocations = mapOutputTracker.getShufflePushMergerLocations(shuffleId) - removeShuffleFromShuffleServicesFutures ++= shuffleMergerLocations.map(bmId => { - Future[Boolean] { - shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId) - } - }) - } + // It needs to be invoked at last to avoid cleaning up shuffleStatuses in mapOutputTracker + // too early before used by [[removeShuffleFromShuffleServicesFutures]] + val removeMsg = RemoveShuffle(shuffleId) + val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm => + bm.storageEndpoint.ask[Boolean](removeMsg).recover { + // use false as default value means no shuffle data were removed + handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) + } + }.toSeq Future.sequence(removeShuffleFromExecutorsFutures ++ removeShuffleFromShuffleServicesFutures) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 16fa42056921..075a21c399e0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -956,7 +956,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers { assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload( mergeManager2, mergeManager2DB) == 1) assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload( - mergeManager2, mergeManager2DB) == 2) + mergeManager2, mergeManager2DB) == 1) s2.stop() // Yarn Shuffle service comes back up without custom mergeManager