From 7e44d4c756d5568d5dc27801b2bd72c290a4b13c Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 13 Oct 2023 18:47:53 -0700 Subject: [PATCH] RATIS-1904. Refactor RaftServerImpl.submitClientRequestAsync(..). --- .../ratis/protocol/RaftClientRequest.java | 24 ++- .../ratis/server/impl/RaftServerImpl.java | 195 +++++++++--------- 2 files changed, 123 insertions(+), 96 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 220694ce0a..3c3ae1ba08 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -160,8 +160,24 @@ private Type(WatchRequestTypeProto watch) { this(WATCH, watch); } - public boolean is(RaftClientRequestProto.TypeCase tCase) { - return getTypeCase().equals(tCase); + public boolean is(RaftClientRequestProto.TypeCase t) { + return getTypeCase() == t; + } + + public boolean isReadOnly() { + switch (getTypeCase()) { + case READ: + case STALEREAD: + case WATCH: + return true; + case WRITE: + case MESSAGESTREAM: + case DATASTREAM: + case FORWARD: + return false; + default: + throw new IllegalStateException("Unexpected type case: " + getTypeCase()); + } } public RaftClientRequestProto.TypeCase getTypeCase() { @@ -393,6 +409,10 @@ public boolean is(RaftClientRequestProto.TypeCase typeCase) { return getType().is(typeCase); } + public boolean isReadOnly() { + return getType().isReadOnly(); + } + public RoutingTable getRoutingTable() { return routingTable; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 6127972cb6..667e611b42 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -416,7 +416,7 @@ static boolean registerMBean( * The peer belongs to the current configuration, should start as a follower or listener */ private void startAsPeer(RaftPeerRole newRole) { - Object reason = ""; + final Object reason; if (newRole == RaftPeerRole.FOLLOWER) { reason = "startAsFollower"; setRole(RaftPeerRole.FOLLOWER, reason); @@ -485,8 +485,8 @@ void groupRemove(boolean deleteDirectory, boolean renameDirectory) { break; } catch (NoSuchFileException e) { LOG.warn("{}: Some file does not exist {}", getMemberId(), dir, e); - } catch (Exception ignored) { - LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, ignored); + } catch (Exception e) { + LOG.error("{}: Failed to remove RaftStorageDirectory {}", getMemberId(), dir, e); break; } } @@ -514,45 +514,45 @@ public void close() { LOG.info("{}: shutdown", getMemberId()); try { jmxAdapter.unregister(); - } catch (Exception ignored) { - LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to un-register RaftServer JMX bean", getMemberId(), e); } try { role.shutdownFollowerState(); - } catch (Exception ignored) { - LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(), e); } try{ role.shutdownLeaderElection(); - } catch (Exception ignored) { - LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e); } try{ role.shutdownLeaderState(true); - } catch (Exception ignored) { - LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), e); } try{ state.close(); - } catch (Exception ignored) { - LOG.warn("{}: Failed to close state", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to close state", getMemberId(), e); } try { leaderElectionMetrics.unregister(); raftServerMetrics.unregister(); RaftServerMetricsImpl.removeRaftServerMetrics(getMemberId()); - } catch (Exception ignored) { - LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored); + } catch (Exception e) { + LOG.warn("{}: Failed to unregister metric", getMemberId(), e); } try { ConcurrentUtils.shutdownAndWait(clientExecutor); - } catch (Exception ignored) { - LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", ignored); + } catch (Exception e) { + LOG.warn(getMemberId() + ": Failed to shutdown clientExecutor", e); } try { ConcurrentUtils.shutdownAndWait(serverExecutor); - } catch (Exception ignored) { - LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", ignored); + } catch (Exception e) { + LOG.warn(getMemberId() + ": Failed to shutdown serverExecutor", e); } }); } @@ -740,11 +740,14 @@ RaftClientReply newExceptionReply(RaftClientRequest request, RaftException excep .build(); } + private CompletableFuture checkLeaderState(RaftClientRequest request) { + return checkLeaderState(request, null); + } + /** * @return null if the server is in leader state. */ - private CompletableFuture checkLeaderState(RaftClientRequest request, CacheEntry entry, - boolean isWrite) { + private CompletableFuture checkLeaderState(RaftClientRequest request, CacheEntry entry) { try { assertGroup(request.getRequestorId(), request.getRaftGroupId()); } catch (GroupMismatchException e) { @@ -766,7 +769,7 @@ private CompletableFuture checkLeaderState(RaftClientRequest re return RetryCacheImpl.failWithReply(reply, entry); } - if (isWrite && isSteppingDown()) { + if (!request.isReadOnly() && isSteppingDown()) { final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down"); final RaftClientReply reply = newExceptionReply(request, lsde); return RetryCacheImpl.failWithReply(reply, entry); @@ -790,10 +793,9 @@ NotLeaderException generateNotLeaderException() { return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers); } - LifeCycle.State assertLifeCycleState(Set expected) throws ServerNotReadyException { - return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException( - getMemberId() + " is not in " + expected + ": current state is " + c), - expected); + void assertLifeCycleState(Set expected) throws ServerNotReadyException { + lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException( + getMemberId() + " is not in " + expected + ": current state is " + c), expected); } void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupMismatchException { @@ -814,11 +816,10 @@ private CompletableFuture appendTransaction( request.getClientId(), request, context, cacheEntry); assertLifeCycleState(LifeCycle.States.RUNNING); - CompletableFuture reply; final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request, cacheEntry, true); + final CompletableFuture reply = checkLeaderState(request, cacheEntry); if (reply != null) { return reply; } @@ -890,75 +891,67 @@ public CompletableFuture submitClientRequestAsync( LOG.debug("{}: receive client request({})", getMemberId(), request); final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType()); final Optional timerContext = Optional.ofNullable(timer).map(Timekeeper::time); - - final CompletableFuture replyFuture; - - if (request.is(TypeCase.STALEREAD)) { - replyFuture = staleReadAsync(request); - } else if (request.is(TypeCase.READ)) { - replyFuture = readAsync(request); - } else { - // first check the server's leader state - CompletableFuture reply = checkLeaderState(request, null, - !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH)); - if (reply != null) { - return reply; + return replyFuture(request).whenComplete((clientReply, exception) -> { + timerContext.ifPresent(Timekeeper.Context::stop); + if (exception != null || clientReply.getException() != null) { + raftServerMetrics.incFailedRequestCount(request.getType()); } + }); + } - // let the state machine handle read-only request from client - RaftClientRequest.Type type = request.getType(); - if (type.is(TypeCase.MESSAGESTREAM)) { - if (type.getMessageStream().getEndOfRequest()) { - final CompletableFuture f = streamEndOfRequestAsync(request); - if (f.isCompletedExceptionally()) { - return f.thenApply(r -> null); - } - request = f.join(); - type = request.getType(); - } - } + private CompletableFuture replyFuture(RaftClientRequest request) throws IOException { + final TypeCase type = request.getType().getTypeCase(); + switch (type) { + case STALEREAD: + return staleReadAsync(request); + case READ: + return readAsync(request); + case WATCH: + return watchAsync(request); + case MESSAGESTREAM: + return messageStreamAsync(request); + case WRITE: + case FORWARD: + return writeAsync(request); + default: + throw new IllegalStateException("Unexpected request type: " + type + ", request=" + request); + } + } - if (type.is(TypeCase.WATCH)) { - replyFuture = watchAsync(request); - } else if (type.is(TypeCase.MESSAGESTREAM)) { - replyFuture = streamAsync(request); - } else { - // query the retry cache - final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request)); - final CacheEntry cacheEntry = queryResult.getEntry(); - if (queryResult.isRetry()) { - // if the previous attempt is still pending or it succeeded, return its - // future - replyFuture = cacheEntry.getReplyFuture(); - } else { - // TODO: this client request will not be added to pending requests until - // later which means that any failure in between will leave partial state in - // the state machine. We should call cancelTransaction() for failed requests - final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( - filterDataStreamRaftClientRequest(request)); - if (context.getException() != null) { - final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); - final RaftClientReply exceptionReply = newExceptionReply(request, e); - cacheEntry.failWithReply(exceptionReply); - replyFuture = CompletableFuture.completedFuture(exceptionReply); - } else { - replyFuture = appendTransaction(request, context, cacheEntry); - } - } - } + private CompletableFuture writeAsync(RaftClientRequest request) throws IOException { + final CompletableFuture reply = checkLeaderState(request); + if (reply != null) { + return reply; } - final RaftClientRequest.Type type = request.getType(); - replyFuture.whenComplete((clientReply, exception) -> { - timerContext.ifPresent(Timekeeper.Context::stop); - if (exception != null || clientReply.getException() != null) { - raftServerMetrics.incFailedRequestCount(type); - } - }); - return replyFuture; + // query the retry cache + final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request)); + final CacheEntry cacheEntry = queryResult.getEntry(); + if (queryResult.isRetry()) { + // return the cached future. + return cacheEntry.getReplyFuture(); + } + // TODO: this client request will not be added to pending requests until + // later which means that any failure in between will leave partial state in + // the state machine. We should call cancelTransaction() for failed requests + final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( + filterDataStreamRaftClientRequest(request)); + if (context.getException() != null) { + final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); + final RaftClientReply exceptionReply = newExceptionReply(request, e); + cacheEntry.failWithReply(exceptionReply); + return CompletableFuture.completedFuture(exceptionReply); + } + + return appendTransaction(request, context, cacheEntry); } private CompletableFuture watchAsync(RaftClientRequest request) { + final CompletableFuture reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + return role.getLeaderState() .map(ls -> ls.addWatchReqeust(request)) .orElseGet(() -> CompletableFuture.completedFuture( @@ -1003,7 +996,7 @@ private CompletableFuture getReadIndex(RaftClientRequest request, LeaderSt private CompletableFuture readAsync(RaftClientRequest request) { if (request.getType().getRead().getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { - final CompletableFuture reply = checkLeaderState(request, null, false); + final CompletableFuture reply = checkLeaderState(request); if (reply != null) { return reply; } @@ -1053,7 +1046,21 @@ private RaftClientReply readException2Reply(RaftClientRequest request, Throwable } } - private CompletableFuture streamAsync(RaftClientRequest request) { + private CompletableFuture messageStreamAsync(RaftClientRequest request) throws IOException { + final CompletableFuture reply = checkLeaderState(request); + if (reply != null) { + return reply; + } + + if (request.getType().getMessageStream().getEndOfRequest()) { + final CompletableFuture f = streamEndOfRequestAsync(request); + if (f.isCompletedExceptionally()) { + return f.thenApply(r -> null); + } + // the message stream has ended and the request become a WRITE request + return replyFuture(f.join()); + } + return role.getLeaderState() .map(ls -> ls.streamAsync(request)) .orElseGet(() -> CompletableFuture.completedFuture( @@ -1146,7 +1153,7 @@ CompletableFuture transferLeadershipAsync(TransferLeadershipReq assertGroup(request.getRequestorId(), request.getRaftGroupId()); synchronized (this) { - CompletableFuture reply = checkLeaderState(request, null, false); + CompletableFuture reply = checkLeaderState(request); if (reply != null) { return reply; } @@ -1254,7 +1261,7 @@ public CompletableFuture setConfigurationAsync(SetConfiguration assertLifeCycleState(LifeCycle.States.RUNNING); assertGroup(request.getRequestorId(), request.getRaftGroupId()); - CompletableFuture reply = checkLeaderState(request, null, true); + CompletableFuture reply = checkLeaderState(request); if (reply != null) { return reply; } @@ -1262,7 +1269,7 @@ public CompletableFuture setConfigurationAsync(SetConfiguration final SetConfigurationRequest.Arguments arguments = request.getArguments(); final PendingRequest pending; synchronized (this) { - reply = checkLeaderState(request, null, false); + reply = checkLeaderState(request); if (reply != null) { return reply; }