Skip to content

Commit

Permalink
[#1608] feat(spark3): Ensure the compatiblity of reassign and stageRe…
Browse files Browse the repository at this point in the history
…try (#1783)

### What changes were proposed in this pull request?

Ensure the compatiblity of reassign and stageRetry. 

### Why are the changes needed?

To improve the job stability if having reassign and stage retry.
For #1608

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests
  • Loading branch information
zuston committed Jun 19, 2024
1 parent 7794f0f commit 38dcff1
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac

protected boolean blockIdSelfManagedEnabled;

protected boolean taskBlockSendFailureRetryEnabled;
protected boolean partitionReassignEnabled;

protected boolean shuffleManagerRpcServiceEnabled;

Expand Down Expand Up @@ -638,7 +638,7 @@ public ShuffleHandleInfo getShuffleHandleInfoByShuffleId(int shuffleId) {
@Override
public int getMaxFetchFailures() {
final String TASK_MAX_FAILURE = "spark.task.maxFailures";
return Math.max(1, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
return Math.max(0, sparkConf.getInt(TASK_MAX_FAILURE, 4) - 1);
}

/**
Expand Down Expand Up @@ -701,6 +701,10 @@ public boolean reassignOnStageResubmit(
(StageAttemptShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
stageAttemptShuffleHandleInfo.replaceCurrentShuffleHandleInfo(shuffleHandleInfo);
rssStageResubmitManager.recordAndGetServerAssignedInfo(shuffleId, stageIdAndAttempt, true);
LOG.info(
"The stage retry has been triggered successfully for the stageId: {}, attemptNumber: {}",
stageId,
stageAttemptNumber);
return true;
} else {
LOG.info(
Expand All @@ -720,12 +724,22 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
long startTime = System.currentTimeMillis();
MutableShuffleHandleInfo handleInfo =
(MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
synchronized (handleInfo) {
ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
MutableShuffleHandleInfo internalHandle = null;
if (handleInfo instanceof MutableShuffleHandleInfo) {
internalHandle = (MutableShuffleHandleInfo) handleInfo;
} else if (handleInfo instanceof StageAttemptShuffleHandleInfo) {
internalHandle =
(MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo) handleInfo).getCurrent();
}
if (internalHandle == null) {
throw new RssException(
"An unexpected error occurred: internalHandle is null, which should not happen");
}
synchronized (internalHandle) {
// If the reassignment servers for one partition exceeds the max reassign server num,
// it should fast fail.
handleInfo.checkPartitionReassignServerNum(
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);

Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new HashMap<>();
Expand All @@ -740,10 +754,10 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
String serverId = receivingFailureServer.getServerId();

boolean serverHasReplaced = false;
Set<ShuffleServerInfo> replacements = handleInfo.getReplacements(serverId);
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
final int requiredServerNum = 1;
Set<String> excludedServers = new HashSet<>(handleInfo.listExcludedServers());
Set<String> excludedServers = new HashSet<>(internalHandle.listExcludedServers());
excludedServers.add(serverId);
replacements =
reassignServerForTask(
Expand All @@ -759,7 +773,7 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
}

Set<ShuffleServerInfo> updatedReassignServers =
handleInfo.updateAssignment(partitionId, serverId, replacements);
internalHandle.updateAssignment(partitionId, serverId, replacements);

reassignResult
.computeIfAbsent(serverId, x -> new HashMap<>())
Expand Down Expand Up @@ -788,7 +802,7 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
System.currentTimeMillis() - startTime,
reassignResult);

return handleInfo;
return internalHandle;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetryEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
if (isDriver) {
heartBeatScheduledExecutorService =
Expand Down Expand Up @@ -341,7 +340,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, handleInfo);
shuffleHandleInfoManager.register(shuffleId, stageAttemptShuffleHandleInfo);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
Expand Down Expand Up @@ -410,7 +409,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
Expand Down Expand Up @@ -483,7 +482,7 @@ public <K, C> ShuffleReader<K, C> getReader(
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,19 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskBlockSendFailureRetryEnabled =
rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (taskBlockSendFailureRetryEnabled) {
if (rssResubmitStage || dataReplica > 1) {
if (partitionReassignEnabled) {
if (dataReplica > 1) {
throw new RssException(
"The feature of partition reassign is incompatible with multiple replicas and stage retry.");
"The feature of task partition reassign is incompatible with multiple replicas mechanism.");
}
}

this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
taskBlockSendFailureRetryEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
Expand Down Expand Up @@ -455,7 +454,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
StageAttemptShuffleHandleInfo handleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, shuffleHandleInfo);
shuffleHandleInfoManager.register(shuffleId, handleInfo);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
Expand Down Expand Up @@ -496,7 +495,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
Expand Down Expand Up @@ -640,7 +639,7 @@ public <K, C> ShuffleReader<K, C> getReaderImpl(
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && taskBlockSendFailureRetryEnabled) {
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithBlockRetry(shuffleId);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import org.apache.spark.SparkConf;

import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_CLIENT_RETRY_MAX;
import static org.apache.uniffle.client.util.RssClientConfig.RSS_RESUBMIT_STAGE;
import static org.apache.uniffle.common.config.RssClientConf.RSS_CLIENT_REASSIGN_ENABLED;

/**
* This class is to test the compatibility of reassign and stage retry mechanism that were enabled
* at the same time.
*/
public class ReassignAndStageRetryTest extends PartitionBlockDataReassignMultiTimesTest {

@Override
public void updateSparkConfCustomer(SparkConf sparkConf) {
sparkConf.set("spark.task.maxFailures", String.valueOf(1));
sparkConf.set("spark." + RSS_RESUBMIT_STAGE, "true");

sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark." + RSS_CLIENT_RETRY_MAX, "2");
sparkConf.set("spark." + RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER, "1");
sparkConf.set("spark." + RSS_CLIENT_REASSIGN_ENABLED.key(), "true");
sparkConf.set("spark." + RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES.key(), "1");

// simulate the grpc servers has different free memory
// and make the assign priority seq: g1 -> g2 -> g3
ShuffleServer g1 = grpcShuffleServers.get(0);
ShuffleBufferManager bufferManager = g1.getShuffleBufferManager();
bufferManager.setUsedMemory(bufferManager.getCapacity() - 3000000);
g1.sendHeartbeat();

ShuffleServer g2 = grpcShuffleServers.get(1);
bufferManager = g2.getShuffleBufferManager();
bufferManager.setUsedMemory(bufferManager.getCapacity() - 2000000);
g2.sendHeartbeat();

ShuffleServer g3 = grpcShuffleServers.get(2);
bufferManager = g3.getShuffleBufferManager();
bufferManager.setUsedMemory(bufferManager.getCapacity() - 1000000);
g3.sendHeartbeat();

// This will make the partition of g1 reassign to g2 servers.
((MockedGrpcServer) g1.getServer()).getService().setMockSendDataFailedStageNumber(0);
// And then reassign to g3. But reassign max times reaches due to max reassign times.
((MockedGrpcServer) g2.getServer()).getService().setMockSendDataFailedStageNumber(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MockedShuffleServerGrpcService extends ShuffleServerGrpcService {
private long mockedTimeout = -1L;

private boolean mockSendDataFailed = false;
private int mockSendDataFailedStageNumber = -1;

private boolean mockRequireBufferFailedWithNoBuffer = false;
private boolean isMockRequireBufferFailedWithNoBufferForHugePartition = false;
Expand Down Expand Up @@ -130,6 +131,12 @@ public void sendShuffleData(
LOG.info("Add a mocked sendData failed on sendShuffleData");
throw new RuntimeException("This write request is failed as mocked failure!");
}
if (mockSendDataFailedStageNumber == request.getStageAttemptNumber()) {
LOG.info(
"Add a mocked sendData failed on sendShuffleData with the stage number={}",
mockSendDataFailedStageNumber);
throw new RuntimeException("This write request is failed as mocked failure!");
}
if (mockedTimeout > 0) {
LOG.info("Add a mocked timeout on sendShuffleData");
Uninterruptibles.sleepUninterruptibly(mockedTimeout, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -286,4 +293,12 @@ public void getLocalShuffleIndex(
}
super.getLocalShuffleIndex(request, responseObserver);
}

public int getMockSendDataFailedStageNumber() {
return mockSendDataFailedStageNumber;
}

public void setMockSendDataFailedStageNumber(int mockSendDataFailedStageNumber) {
this.mockSendDataFailedStageNumber = mockSendDataFailedStageNumber;
}
}

0 comments on commit 38dcff1

Please sign in to comment.