From 1c20795623286ceef7eba940149c5717155b3951 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 24 Apr 2024 11:13:04 +0800 Subject: [PATCH] server_inactive for requireBuffer rpc --- .../server/ShuffleServerGrpcService.java | 33 +++++++++---------- .../netty/ShuffleServerNettyHandler.java | 19 ----------- 2 files changed, 16 insertions(+), 36 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 041f2fd3fe..732131053f 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -152,10 +152,9 @@ public void unregisterShuffle( @Override public void registerShuffle( ShuffleRegisterRequest req, StreamObserver responseObserver) { - ShuffleRegisterResponse reply; - String appId = req.getAppId(); int shuffleId = req.getShuffleId(); + String appId = req.getAppId(); String remoteStoragePath = req.getRemoteStorage().getPath(); String user = req.getUser(); @@ -206,22 +205,8 @@ public void registerShuffle( @Override public void sendShuffleData( SendShuffleDataRequest req, StreamObserver responseObserver) { - String appId = req.getAppId(); - if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE - && shuffleServer - .getShuffleTaskManager() - .getShuffleTaskInfo(appId) - .isBlockFailureReassignEnabled()) { - responseObserver.onNext( - SendShuffleDataResponse.newBuilder() - .setStatus(StatusCode.SERVER_INACTIVE.toProto()) - .setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus()) - .build()); - responseObserver.onCompleted(); - return; - } - SendShuffleDataResponse reply; + String appId = req.getAppId(); int shuffleId = req.getShuffleId(); long requireBufferId = req.getRequireBufferId(); long timestamp = req.getTimestamp(); @@ -443,6 +428,20 @@ public void finishShuffle( public void requireBuffer( RequireBufferRequest request, StreamObserver responseObserver) { String appId = request.getAppId(); + if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE + && shuffleServer + .getShuffleTaskManager() + .getShuffleTaskInfo(appId) + .isBlockFailureReassignEnabled()) { + responseObserver.onNext( + RequireBufferResponse.newBuilder() + .setStatus(StatusCode.SERVER_INACTIVE.toProto()) + .setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus()) + .build()); + responseObserver.onCompleted(); + return; + } + long requireBufferId = -1; StatusCode status = StatusCode.SUCCESS; try { diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java index dbc01274eb..dbda25abc0 100644 --- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java +++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.BufferSegment; -import org.apache.uniffle.common.ServerStatus; import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShuffleIndexResult; @@ -100,24 +99,6 @@ public void exceptionCaught(Throwable cause, TransportClient client) { public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) { RpcResponse rpcResponse; String appId = req.getAppId(); - - if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE - && shuffleServer - .getShuffleTaskManager() - .getShuffleTaskInfo(appId) - .isBlockFailureReassignEnabled()) { - req.getPartitionToBlocks().values().stream() - .flatMap(Collection::stream) - .forEach(block -> block.getData().release()); - rpcResponse = - new RpcResponse( - req.getRequestId(), - StatusCode.SERVER_INACTIVE, - "Server is inactive, status: " + shuffleServer.getServerStatus()); - client.getChannel().writeAndFlush(rpcResponse); - return; - } - int shuffleId = req.getShuffleId(); long requireBufferId = req.getRequireId(); long timestamp = req.getTimestamp();