From 38dcff19a8e9cec615bc8074ff1ebabecd57cdf9 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 19 Jun 2024 13:29:46 +0800 Subject: [PATCH] [#1608] feat(spark3): Ensure the compatiblity of reassign and stageRetry (#1783) ### What changes were proposed in this pull request? Ensure the compatiblity of reassign and stageRetry. ### Why are the changes needed? To improve the job stability if having reassign and stage retry. For #1608 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests --- .../manager/RssShuffleManagerBase.java | 34 ++++++--- .../spark/shuffle/RssShuffleManager.java | 11 ++- .../spark/shuffle/RssShuffleManager.java | 17 +++-- .../test/ReassignAndStageRetryTest.java | 71 +++++++++++++++++++ .../MockedShuffleServerGrpcService.java | 15 ++++ 5 files changed, 123 insertions(+), 25 deletions(-) create mode 100644 integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java diff --git a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java index 197035e3a3..25acf1f0fe 100644 --- a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java +++ b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java @@ -117,7 +117,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac protected boolean blockIdSelfManagedEnabled; - protected boolean taskBlockSendFailureRetryEnabled; + protected boolean partitionReassignEnabled; protected boolean shuffleManagerRpcServiceEnabled; @@ -638,7 +638,7 @@ public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) { @Override public int getMaxFetchFailures() { final String TASK_MAX_FAILURE = "spark.task.maxFailures"; - return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1); + return Math.max(0, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1); } /** @@ -701,6 +701,10 @@ public boolean reassignOnStageResubmit( (StageAttemptShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId); stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo); rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, stageIdAndAttempt, true); + LOG.info( + "The stage retry has been triggered successfully for the stageId: {}, attemptNumber: {}", + stageId, + stageAttemptNumber); return true; } else { LOG.info( @@ -720,12 +724,22 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure( int shuffleId, Map> partitionToFailureServers) { long startTime = System.currentTimeMillis(); - MutableShuffleHandleInfo handleInfo = - (MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId); - synchronized (handleInfo) { + ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId); + MutableShuffleHandleInfo internalHandle = null; + if (handleInfo instanceof MutableShuffleHandleInfo) { + internalHandle = (MutableShuffleHandleInfo) handleInfo; + } else if (handleInfo instanceof StageAttemptShuffleHandleInfo) { + internalHandle = + (MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo) handleInfo).getCurrent(); + } + if (internalHandle == null) { + throw new RssException( + "An unexpected error occurred: internalHandle is null, which should not happen"); + } + synchronized (internalHandle) { // If the reassignment servers for one partition exceeds the max reassign server num, // it should fast fail. - handleInfo.checkPartitionReassignServerNum( + internalHandle.checkPartitionReassignServerNum( partitionToFailureServers.keySet(), partitionReassignMaxServerNum); Map> newServerToPartitions = new HashMap<>(); @@ -740,10 +754,10 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure( String serverId = receivingFailureServer.getServerId(); boolean serverHasReplaced = false; - Set replacements = handleInfo.getReplacements(serverId); + Set replacements = internalHandle.getReplacements(serverId); if (CollectionUtils.isEmpty(replacements)) { final int requiredServerNum = 1; - Set excludedServers = new HashSet<>(handleInfo.listExcludedServers()); + Set excludedServers = new HashSet<>(internalHandle.listExcludedServers()); excludedServers.add(serverId); replacements = reassignServerForTask( @@ -759,7 +773,7 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure( } Set updatedReassignServers = - handleInfo.updateAssignment(partitionId, serverId, replacements); + internalHandle.updateAssignment(partitionId, serverId, replacements); reassignResult .computeIfAbsent(serverId, x -> new HashMap<>()) @@ -788,7 +802,7 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure( System.currentTimeMillis() - startTime, reassignResult); - return handleInfo; + return internalHandle; } } diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index eb48b2f38d..974b998621 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -168,11 +168,10 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { this.rssResubmitStage = rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && RssSparkShuffleUtils.isStageResubmitSupported(); - this.taskBlockSendFailureRetryEnabled = - rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); + this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED); this.shuffleManagerRpcServiceEnabled = - taskBlockSendFailureRetryEnabled || rssResubmitStage || blockIdSelfManagedEnabled; + partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled; if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) { if (isDriver) { heartBeatScheduledExecutorService = @@ -341,7 +340,7 @@ public ShuffleHandle registerShuffle( StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo = new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, handleInfo); shuffleHandleInfoManager.register(shuffleId, stageAttemptShuffleHandleInfo); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { ShuffleHandleInfo shuffleHandleInfo = new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage); shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo); @@ -410,7 +409,7 @@ public ShuffleWriter getWriter( if (shuffleManagerRpcServiceEnabled && rssResubmitStage) { // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { // In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId); } else { @@ -483,7 +482,7 @@ public ShuffleReader getReader( if (shuffleManagerRpcServiceEnabled && rssResubmitStage) { // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId. shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { // In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId); } else { diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 6354e17a2a..9e6f2a26f2 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -182,20 +182,19 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { this.rssResubmitStage = rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) && RssSparkShuffleUtils.isStageResubmitSupported(); - this.taskBlockSendFailureRetryEnabled = - rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); + this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED); // The feature of partition reassign is exclusive with multiple replicas and stage retry. - if (taskBlockSendFailureRetryEnabled) { - if (rssResubmitStage || dataReplica > 1) { + if (partitionReassignEnabled) { + if (dataReplica > 1) { throw new RssException( - "The feature of partition reassign is incompatible with multiple replicas and stage retry."); + "The feature of task partition reassign is incompatible with multiple replicas mechanism."); } } this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED); this.shuffleManagerRpcServiceEnabled = - taskBlockSendFailureRetryEnabled || rssResubmitStage || blockIdSelfManagedEnabled; + partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled; if (isDriver) { heartBeatScheduledExecutorService = ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat"); @@ -455,7 +454,7 @@ public ShuffleHandle registerShuffle( StageAttemptShuffleHandleInfo handleInfo = new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, shuffleHandleInfo); shuffleHandleInfoManager.register(shuffleId, handleInfo); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { ShuffleHandleInfo shuffleHandleInfo = new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage); shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo); @@ -496,7 +495,7 @@ public ShuffleWriter getWriter( if (shuffleManagerRpcServiceEnabled && rssResubmitStage) { // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId. shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId. shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId); } else { @@ -640,7 +639,7 @@ public ShuffleReader getReaderImpl( if (shuffleManagerRpcServiceEnabled && rssResubmitStage) { // In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId. shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId); - } else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) { + } else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) { // In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId. shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId); } else { diff --git a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java new file mode 100644 index 0000000000..663a176c87 --- /dev/null +++ b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ReassignAndStageRetryTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import org.apache.spark.SparkConf; + +import org.apache.uniffle.server.MockedGrpcServer; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.buffer.ShuffleBufferManager; + +import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES; +import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER; +import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX; +import static org.apache.uniffle.client.util.RssClientConfig.RSS_RESUBMIT_STAGE; +import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED; + +/** + * This class is to test the compatibility of reassign and stage retry mechanism that were enabled + * at the same time. + */ +public class ReassignAndStageRetryTest extends PartitionBlockDataReassignMultiTimesTest { + + @Override + public void updateSparkConfCustomer(SparkConf sparkConf) { + sparkConf.set("spark.task.maxFailures", String.valueOf(1)); + sparkConf.set("spark." + RSS_RESUBMIT_STAGE, "true"); + + sparkConf.set("spark.sql.shuffle.partitions", "4"); + sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2"); + sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1"); + sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true"); + sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "1"); + + // simulate the grpc servers has different free memory + // and make the assign priority seq: g1 -> g2 -> g3 + ShuffleServer g1 = grpcShuffleServers.get(0); + ShuffleBufferManager bufferManager = g1.getShuffleBufferManager(); + bufferManager.setUsedMemory(bufferManager.getCapacity() - 3000000); + g1.sendHeartbeat(); + + ShuffleServer g2 = grpcShuffleServers.get(1); + bufferManager = g2.getShuffleBufferManager(); + bufferManager.setUsedMemory(bufferManager.getCapacity() - 2000000); + g2.sendHeartbeat(); + + ShuffleServer g3 = grpcShuffleServers.get(2); + bufferManager = g3.getShuffleBufferManager(); + bufferManager.setUsedMemory(bufferManager.getCapacity() - 1000000); + g3.sendHeartbeat(); + + // This will make the partition of g1 reassign to g2 servers. + ((MockedGrpcServer) g1.getServer()).getService().setMockSendDataFailedStageNumber(0); + // And then reassign to g3. But reassign max times reaches due to max reassign times. + ((MockedGrpcServer) g2.getServer()).getService().setMockSendDataFailedStageNumber(0); + } +} diff --git a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java index d0d77f60d5..a25e1fd45e 100644 --- a/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java +++ b/server/src/test/java/org/apache/uniffle/server/MockedShuffleServerGrpcService.java @@ -44,6 +44,7 @@ public class MockedShuffleServerGrpcService extends ShuffleServerGrpcService { private long mockedTimeout = -1L; private boolean mockSendDataFailed = false; + private int mockSendDataFailedStageNumber = -1; private boolean mockRequireBufferFailedWithNoBuffer = false; private boolean isMockRequireBufferFailedWithNoBufferForHugePartition = false; @@ -130,6 +131,12 @@ public void sendShuffleData( LOG.info("Add a mocked sendData failed on sendShuffleData"); throw new RuntimeException("This write request is failed as mocked failureļ¼"); } + if (mockSendDataFailedStageNumber == request.getStageAttemptNumber()) { + LOG.info( + "Add a mocked sendData failed on sendShuffleData with the stage number={}", + mockSendDataFailedStageNumber); + throw new RuntimeException("This write request is failed as mocked failureļ¼"); + } if (mockedTimeout > 0) { LOG.info("Add a mocked timeout on sendShuffleData"); Uninterruptibles.sleepUninterruptibly(mockedTimeout, TimeUnit.MILLISECONDS); @@ -286,4 +293,12 @@ public void getLocalShuffleIndex( } super.getLocalShuffleIndex(request, responseObserver); } + + public int getMockSendDataFailedStageNumber() { + return mockSendDataFailedStageNumber; + } + + public void setMockSendDataFailedStageNumber(int mockSendDataFailedStageNumber) { + this.mockSendDataFailedStageNumber = mockSendDataFailedStageNumber; + } }