Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1608] improvement(spark3): Output more task level infos in driver side when reassigning on block sent failure #1771

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void getPartitionToShufflerServer(
@Override
public void reassignShuffleServers(
RssProtos.ReassignServersRequest request,
StreamObserver<RssProtos.ReassignServersReponse> responseObserver) {
StreamObserver<RssProtos.ReassignServersResponse> responseObserver) {
int stageId = request.getStageId();
int stageAttemptNumber = request.getStageAttemptNumber();
int shuffleId = request.getShuffleId();
Expand All @@ -223,8 +223,8 @@ public void reassignShuffleServers(
shuffleManager.reassignAllShuffleServersForWholeStage(
stageId, stageAttemptNumber, shuffleId, numPartitions);
RssProtos.StatusCode code = RssProtos.StatusCode.SUCCESS;
RssProtos.ReassignServersReponse reply =
RssProtos.ReassignServersReponse.newBuilder()
RssProtos.ReassignServersResponse reply =
RssProtos.ReassignServersResponse.newBuilder()
.setStatus(code)
.setNeedReassign(needReassign)
.build();
Expand All @@ -241,6 +241,13 @@ public void reassignOnBlockSendFailure(
RssProtos.StatusCode code = RssProtos.StatusCode.INTERNAL_ERROR;
RssProtos.RssReassignOnBlockSendFailureResponse reply;
try {
LOG.info(
"Accepted reassign request on block sent failure for shuffleId: {}, stageId: {}, stageAttemptNumber: {} from taskAttemptId: {} on executorId: {}",
request.getShuffleId(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getTaskAttemptId(),
request.getExecutorId());
MutableShuffleHandleInfo handle =
shuffleManager.reassignOnBlockSendFailure(
request.getShuffleId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleBlockInfo;
Expand Down Expand Up @@ -548,11 +548,11 @@ private void throwFetchFailedIfNecessary(Exception e) {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersReponse rssReassignServersReponse =
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersReponse.isNeedReassign());
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,18 +1380,21 @@ private Map<Integer, List<ShuffleServerInfo>> requestShuffleAssignment(
assignmentShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
LOG.info("Finished reassign");
LOG.info("Finished the shuffle assignment request to coordinator.");
if (reassignmentHandler != null) {
response = reassignmentHandler.apply(response);
}
LOG.info(
"Register the partition->servers assignment. {}",
response.getServerToPartitionRanges());
registerShuffleServers(
id.get(), shuffleId, response.getServerToPartitionRanges(), getRemoteStorageInfo());
return response.getPartitionToServers();
},
retryInterval,
retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
throw new RssException("Errors on requesting shuffle assignment!", throwable);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
Expand All @@ -76,7 +77,7 @@
import org.apache.uniffle.client.request.RssReassignServersRequest;
import org.apache.uniffle.client.request.RssReportShuffleWriteFailureRequest;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ReceivingFailureServer;
Expand Down Expand Up @@ -582,8 +583,18 @@ private void doReassignOnBlockSendFailure(
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
try (ShuffleManagerClient shuffleManagerClient = createShuffleManagerClient(driver, port)) {
String executorId = SparkEnv.get().executorId();
long taskAttemptId = taskContext.taskAttemptId();
int stageId = taskContext.stageId();
int stageAttemptNum = taskContext.stageAttemptNumber();
RssReassignOnBlockSendFailureRequest request =
new RssReassignOnBlockSendFailureRequest(shuffleId, failurePartitionToServers);
new RssReassignOnBlockSendFailureRequest(
shuffleId,
failurePartitionToServers,
executorId,
taskAttemptId,
stageId,
stageAttemptNum);
RssReassignOnBlockSendFailureResponse response =
shuffleManagerClient.reassignOnBlockSendFailure(request);
if (response.getStatusCode() != StatusCode.SUCCESS) {
Expand Down Expand Up @@ -815,11 +826,11 @@ private void throwFetchFailedIfNecessary(Exception e) {
taskContext.stageAttemptNumber(),
shuffleId,
partitioner.numPartitions());
RssReassignServersReponse rssReassignServersReponse =
RssReassignServersResponse rssReassignServersResponse =
shuffleManagerClient.reassignShuffleServers(rssReassignServersRequest);
LOG.info(
"Whether the reassignment is successful: {}",
rssReassignServersReponse.isNeedReassign());
rssReassignServersResponse.isNeedReassign());
// since we are going to roll out the whole stage, mapIndex shouldn't matter, hence -1 is
// provided.
FetchFailedException ffe =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
Expand All @@ -51,7 +51,7 @@ RssPartitionToShuffleServerResponse getPartitionToShufflerServer(
RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssReportShuffleWriteFailureRequest req);

RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest req);
RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest req);

RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssPartitionToShuffleServerResponse;
import org.apache.uniffle.client.response.RssReassignOnBlockSendFailureResponse;
import org.apache.uniffle.client.response.RssReassignServersReponse;
import org.apache.uniffle.client.response.RssReassignServersResponse;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
Expand Down Expand Up @@ -116,11 +116,11 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
}

@Override
public RssReassignServersReponse reassignShuffleServers(RssReassignServersRequest req) {
public RssReassignServersResponse reassignShuffleServers(RssReassignServersRequest req) {
RssProtos.ReassignServersRequest reassignServersRequest = req.toProto();
RssProtos.ReassignServersReponse reassignServersReponse =
RssProtos.ReassignServersResponse reassignServersResponse =
getBlockingStub().reassignShuffleServers(reassignServersRequest);
return RssReassignServersReponse.fromProto(reassignServersReponse);
return RssReassignServersResponse.fromProto(reassignServersResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,24 @@
public class RssReassignOnBlockSendFailureRequest {
private int shuffleId;
private Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers;
private String executorId;
private long taskAttemptId;
private int stageId;
private int stageAttemptNumber;

public RssReassignOnBlockSendFailureRequest(
int shuffleId, Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers) {
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> failurePartitionToServers,
String executorId,
long taskAttemptId,
int stageId,
int stageAttemptNum) {
this.shuffleId = shuffleId;
this.failurePartitionToServers = failurePartitionToServers;
this.executorId = executorId;
this.taskAttemptId = taskAttemptId;
this.stageId = stageId;
this.stageAttemptNumber = stageAttemptNum;
}

public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
Expand All @@ -43,6 +56,10 @@ public static RssProtos.RssReassignOnBlockSendFailureRequest toProto(
.collect(
Collectors.toMap(
Map.Entry::getKey, x -> ReceivingFailureServer.toProto(x.getValue()))))
.setExecutorId(request.executorId)
.setStageId(request.stageId)
.setStageAttemptNumber(request.stageAttemptNumber)
.setTaskAttemptId(request.taskAttemptId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.proto.RssProtos;

public class RssReassignServersReponse extends ClientResponse {
public class RssReassignServersResponse extends ClientResponse {

private boolean needReassign;

public RssReassignServersReponse(StatusCode statusCode, String message, boolean needReassign) {
public RssReassignServersResponse(StatusCode statusCode, String message, boolean needReassign) {
super(statusCode, message);
this.needReassign = needReassign;
}
Expand All @@ -33,8 +33,8 @@ public boolean isNeedReassign() {
return needReassign;
}

public static RssReassignServersReponse fromProto(RssProtos.ReassignServersReponse response) {
return new RssReassignServersReponse(
public static RssReassignServersResponse fromProto(RssProtos.ReassignServersResponse response) {
return new RssReassignServersResponse(
// todo: [issue#780] add fromProto for StatusCode issue
StatusCode.valueOf(response.getStatus().name()),
response.getMsg(),
Expand Down
10 changes: 7 additions & 3 deletions proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ service ShuffleManager {
// Report write failures to ShuffleManager
rpc reportShuffleWriteFailure (ReportShuffleWriteFailureRequest) returns (ReportShuffleWriteFailureResponse);
// Reassign the RPC interface of the ShuffleServer list
rpc reassignShuffleServers(ReassignServersRequest) returns (ReassignServersReponse);
rpc reassignShuffleServers(ReassignServersRequest) returns (ReassignServersResponse);
// Reassign on block send failure that occurs in writer
rpc reassignOnBlockSendFailure(RssReassignOnBlockSendFailureRequest) returns (RssReassignOnBlockSendFailureResponse);
rpc reportShuffleResult (ReportShuffleResultRequest) returns (ReportShuffleResultResponse);
Expand Down Expand Up @@ -609,15 +609,19 @@ message ReassignServersRequest{
int32 numPartitions = 4;
}

message ReassignServersReponse{
message ReassignServersResponse {
StatusCode status = 1;
bool needReassign = 2;
string msg = 3;
}

message RssReassignOnBlockSendFailureRequest{
message RssReassignOnBlockSendFailureRequest {
int32 shuffleId = 1;
map<int32, ReceivingFailureServers> failurePartitionToServerIds = 2;
int64 taskAttemptId = 3;
int32 stageId = 4;
int32 stageAttemptNumber = 5;
string executorId = 6;
}

message ReceivingFailureServers {
Expand Down
Loading