Skip to content

Commit

Permalink
server_inactive for requireBuffer rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Apr 24, 2024
1 parent 9e14c48 commit 1c20795
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ public void unregisterShuffle(
@Override
public void registerShuffle(
ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> responseObserver) {

ShuffleRegisterResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
String appId = req.getAppId();
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();

Expand Down Expand Up @@ -206,22 +205,8 @@ public void registerShuffle(
@Override
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
String appId = req.getAppId();
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
Expand Down Expand Up @@ -443,6 +428,20 @@ public void finishShuffle(
public void requireBuffer(
RequireBufferRequest request, StreamObserver<RequireBufferResponse> responseObserver) {
String appId = request.getAppId();
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
RequireBufferResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

long requireBufferId = -1;
StatusCode status = StatusCode.SUCCESS;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
Expand Down Expand Up @@ -100,24 +99,6 @@ public void exceptionCaught(Throwable cause, TransportClient client) {
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();

if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
rpcResponse =
new RpcResponse(
req.getRequestId(),
StatusCode.SERVER_INACTIVE,
"Server is inactive, status: " + shuffleServer.getServerStatus());
client.getChannel().writeAndFlush(rpcResponse);
return;
}

int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
long timestamp = req.getTimestamp();
Expand Down

0 comments on commit 1c20795

Please sign in to comment.