Skip to content

Commit

Permalink
[#1791] feat(spark)(coordinator): Take more infos on getting assignme…
Browse files Browse the repository at this point in the history
…nt to track app reassign/stageRetry (#1792)

### What changes were proposed in this pull request?

Take more infos on getting assignment to track app reassign/stageRetry

### Why are the changes needed?

For #1791

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.
  • Loading branch information
zuston committed Jun 18, 2024
1 parent e3ec90f commit 7794f0f
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency) {
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,11 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency) {
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ public boolean reassignOnStageResubmit(
requiredShuffleServerNumber,
estimateTaskConcurrency,
rssStageResubmitManager.getServerIdBlackList(),
stageAttemptNumber);
stageId,
stageAttemptNumber,
false);
/**
* we need to clear the metadata of the completed task, otherwise some of the stage's data
* will be lost
Expand Down Expand Up @@ -713,7 +715,10 @@ public boolean reassignOnStageResubmit(
/** this is only valid on driver side that exposed to being invoked by grpc server */
@Override
public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
long startTime = System.currentTimeMillis();
MutableShuffleHandleInfo handleInfo =
(MutableShuffleHandleInfo) shuffleHandleInfoManager.get(shuffleId);
Expand Down Expand Up @@ -742,7 +747,13 @@ public MutableShuffleHandleInfo reassignOnBlockSendFailure(
excludedServers.add(serverId);
replacements =
reassignServerForTask(
shuffleId, Sets.newHashSet(partitionId), excludedServers, requiredServerNum);
stageId,
stageAttemptNumber,
shuffleId,
Sets.newHashSet(partitionId),
excludedServers,
requiredServerNum,
true);
} else {
serverHasReplaced = true;
}
Expand Down Expand Up @@ -805,10 +816,13 @@ private ShuffleAssignmentsInfo createShuffleAssignmentsInfo(

/** Request the new shuffle-servers to replace faulty server. */
private Set<ShuffleServerInfo> reassignServerForTask(
int stageId,
int stageAttemptNumber,
int shuffleId,
Set<Integer> partitionIds,
Set<String> excludedServers,
int requiredServerNum) {
int requiredServerNum,
boolean reassign) {
AtomicReference<Set<ShuffleServerInfo>> replacementsRef =
new AtomicReference<>(new HashSet<>());
requestShuffleAssignment(
Expand All @@ -828,7 +842,10 @@ private Set<ShuffleServerInfo> reassignServerForTask(
.collect(Collectors.toSet());
replacementsRef.set(replacements);
return createShuffleAssignmentsInfo(replacements, partitionIds);
});
},
stageId,
stageAttemptNumber,
reassign);
return replacementsRef.get();
}

Expand All @@ -839,7 +856,10 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> reassignmentHandler) {
Function<ShuffleAssignmentsInfo, ShuffleAssignmentsInfo> reassignmentHandler,
int stageId,
int stageAttemptNumber,
boolean reassign) {
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
Expand All @@ -858,7 +878,10 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
assignmentTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
faultyServerIds,
stageId,
stageAttemptNumber,
reassign);
LOG.info("Finished reassign");
if (reassignmentHandler != null) {
response = reassignmentHandler.apply(response);
Expand All @@ -881,7 +904,9 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageAttemptNumber) {
int stageId,
int stageAttemptNumber,
boolean reassign) {
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
Expand All @@ -901,7 +926,10 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
assignmentTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
faultyServerIds,
stageId,
stageAttemptNumber,
reassign);
registerShuffleServers(
appId,
shuffleId,
Expand All @@ -917,6 +945,26 @@ protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
}
}

protected Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
int shuffleId,
int partitionNum,
int partitionNumPerRange,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageAttemptNumber) {
return requestShuffleAssignment(
shuffleId,
partitionNum,
partitionNumPerRange,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds,
-1,
stageAttemptNumber,
false);
}

protected void registerShuffleServers(
String appId,
int shuffleId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,8 @@ public interface RssShuffleManagerInterface {
boolean reassignOnStageResubmit(int stageId, int stageAttemptNumber, int shuffleId, int numMaps);

MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ public void reassignOnBlockSendFailure(
request.getExecutorId());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getStageId(),
request.getStageAttemptNumber(),
request.getShuffleId(),
request.getFailurePartitionToServerIdsMap().entrySet().stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ public boolean reassignOnStageResubmit(

@Override
public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,11 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency) {
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.client.api;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -99,6 +100,19 @@ void reportShuffleResult(
long taskAttemptId,
int bitmapNum);

ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
int partitionNum,
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign);

default ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
Expand All @@ -108,19 +122,38 @@ default ShuffleAssignmentsInfo getShuffleAssignments(
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds) {
throw new UnsupportedOperationException(
this.getClass().getName()
+ " doesn't implement getShuffleAssignments with faultyServerIds");
return getShuffleAssignments(
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
requiredTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds,
-1,
0,
false);
}

ShuffleAssignmentsInfo getShuffleAssignments(
default ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
int partitionNum,
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency);
int estimateTaskConcurrency) {
return getShuffleAssignments(
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
requiredTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
Collections.emptySet());
}

Roaring64NavigableMap getShuffleResult(
String clientType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,26 +635,6 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {
return remoteStorage;
}

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
int shuffleId,
int partitionNum,
int partitionNumPerRange,
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency) {
return getShuffleAssignments(
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
requiredTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
Sets.newConcurrentHashSet());
}

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(
String appId,
Expand All @@ -664,7 +644,10 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds) {
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign) {
RssGetShuffleAssignmentsRequest request =
new RssGetShuffleAssignmentsRequest(
appId,
Expand All @@ -675,7 +658,10 @@ public ShuffleAssignmentsInfo getShuffleAssignments(
requiredTags,
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
faultyServerIds,
stageId,
stageAttemptNumber,
reassign);

RssGetShuffleAssignmentsResponse response =
new RssGetShuffleAssignmentsResponse(StatusCode.INTERNAL_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,20 @@ public void getShuffleAssignments(
final Set<String> faultyServerIds = new HashSet<>(request.getFaultyServerIdsList());

LOG.info(
"Request of getShuffleAssignments for appId[{}], shuffleId[{}], partitionNum[{}], "
+ " partitionNumPerRange[{}], replica[{}], requiredTags[{}], requiredShuffleServerNumber[{}],faultyServerIds[{}]",
"Request of getShuffleAssignments for appId[{}], shuffleId[{}], partitionNum[{}],"
+ " partitionNumPerRange[{}], replica[{}], requiredTags[{}], requiredShuffleServerNumber[{}],"
+ " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], isReassign[{}]",
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
requiredShuffleServerNumber,
faultyServerIds.size());
faultyServerIds.size(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getReassign());

GetShuffleAssignmentsResponse response;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ public RssProtos.GetShuffleAssignmentsResponse doGetShuffleAssignments(
Set<String> requiredTags,
int assignmentShuffleServerNumber,
int estimateTaskConcurrency,
Set<String> faultyServerIds) {
Set<String> faultyServerIds,
int stageId,
int stageAttemptNumber,
boolean reassign) {
RssProtos.GetShuffleServerRequest getServerRequest =
RssProtos.GetShuffleServerRequest.newBuilder()
.setApplicationId(appId)
Expand All @@ -190,6 +193,9 @@ public RssProtos.GetShuffleAssignmentsResponse doGetShuffleAssignments(
.setAssignmentShuffleServerNumber(assignmentShuffleServerNumber)
.setEstimateTaskConcurrency(estimateTaskConcurrency)
.addAllFaultyServerIds(faultyServerIds)
.setStageId(stageId)
.setStageAttemptNumber(stageAttemptNumber)
.setReassign(reassign)
.build();

return blockingStub.getShuffleAssignments(getServerRequest);
Expand Down Expand Up @@ -283,7 +289,10 @@ public RssGetShuffleAssignmentsResponse getShuffleAssignments(
request.getRequiredTags(),
request.getAssignmentShuffleServerNumber(),
request.getEstimateTaskConcurrency(),
request.getFaultyServerIds());
request.getFaultyServerIds(),
request.getStageId(),
request.getStageAttemptNumber(),
request.isReassign());

RssGetShuffleAssignmentsResponse response;
RssProtos.StatusCode statusCode = rpcResponse.getStatus();
Expand Down
Loading

0 comments on commit 7794f0f

Please sign in to comment.