Skip to content

Commit

Permalink
server_inactive for requireBuffer rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Apr 24, 2024
1 parent 9e14c48 commit 2bac7bc
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,22 @@ public void unregisterShuffle(
@Override
public void registerShuffle(
ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> responseObserver) {
String appId = req.getAppId();
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
ShuffleRegisterResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

ShuffleRegisterResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
String remoteStoragePath = req.getRemoteStorage().getPath();
String user = req.getUser();
Expand Down Expand Up @@ -206,22 +219,8 @@ public void registerShuffle(
@Override
public void sendShuffleData(
SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
String appId = req.getAppId();
if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
responseObserver.onNext(
SendShuffleDataResponse.newBuilder()
.setStatus(StatusCode.SERVER_INACTIVE.toProto())
.setRetMsg("Server is inactive, status: " + shuffleServer.getServerStatus())
.build());
responseObserver.onCompleted();
return;
}

SendShuffleDataResponse reply;
String appId = req.getAppId();
int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireBufferId();
long timestamp = req.getTimestamp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleIndexResult;
Expand Down Expand Up @@ -100,24 +99,6 @@ public void exceptionCaught(Throwable cause, TransportClient client) {
public void handleSendShuffleDataRequest(TransportClient client, SendShuffleDataRequest req) {
RpcResponse rpcResponse;
String appId = req.getAppId();

if (shuffleServer.getServerStatus() != ServerStatus.ACTIVE
&& shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.isBlockFailureReassignEnabled()) {
req.getPartitionToBlocks().values().stream()
.flatMap(Collection::stream)
.forEach(block -> block.getData().release());
rpcResponse =
new RpcResponse(
req.getRequestId(),
StatusCode.SERVER_INACTIVE,
"Server is inactive, status: " + shuffleServer.getServerStatus());
client.getChannel().writeAndFlush(rpcResponse);
return;
}

int shuffleId = req.getShuffleId();
long requireBufferId = req.getRequireId();
long timestamp = req.getTimestamp();
Expand Down

0 comments on commit 2bac7bc

Please sign in to comment.