From decdb17ad54e1b1f10e02090d820f92c27846e56 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 14 Jun 2023 10:51:02 +0800 Subject: [PATCH] RATIS-XXX. Remove unused getRaftClient Signed-off-by: tison --- .../org/apache/ratis/server/RaftServer.java | 29 ++-- .../ratis/server/impl/RaftServerImpl.java | 124 +++++++++++------- .../DataStreamAsyncClusterTests.java | 4 - .../TestNettyDataStreamWithMock.java | 42 +----- 4 files changed, 102 insertions(+), 97 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index 1c99e88d88..84e3a1ed30 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -17,12 +17,26 @@ */ package org.apache.ratis.server; -import org.apache.ratis.client.RaftClient; +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Objects; +import java.util.Optional; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos.CommitInfoProto; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; -import org.apache.ratis.protocol.*; +import org.apache.ratis.protocol.AdminAsynchronousProtocol; +import org.apache.ratis.protocol.AdminProtocol; +import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; +import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.metrics.RaftServerMetrics; import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; @@ -37,14 +51,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Collection; -import java.util.Objects; -import java.util.Optional; - /** Raft server interface */ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, RaftServerAsynchronousProtocol, @@ -111,9 +117,6 @@ default RaftGroup getGroup() { /** @return the data stream map of this division. */ DataStreamMap getDataStreamMap(); - /** @return the internal {@link RaftClient} of this division. */ - RaftClient getRaftClient(); - /** @return the {@link ThreadGroup} the threads of this Division belong to. */ ThreadGroup getThreadGroup(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 90641b73eb..e429518e66 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,24 +17,85 @@ */ package org.apache.ratis.server.impl; -import org.apache.ratis.client.RaftClient; +import java.io.File; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.management.ObjectName; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.Timekeeper; -import org.apache.ratis.proto.RaftProtos.*; +import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.CandidateInfoProto; +import org.apache.ratis.proto.RaftProtos.CommitInfoProto; +import org.apache.ratis.proto.RaftProtos.FollowerInfoProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.proto.RaftProtos.LeaderInfoProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase; -import org.apache.ratis.protocol.*; -import org.apache.ratis.protocol.exceptions.ReadException; -import org.apache.ratis.protocol.exceptions.ReadIndexException; -import org.apache.ratis.protocol.exceptions.SetConfigurationException; +import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto; +import org.apache.ratis.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto; +import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto; +import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.ServerRpcProto; +import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; +import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; +import org.apache.ratis.protocol.ClientInvocationId; +import org.apache.ratis.protocol.GroupInfoReply; +import org.apache.ratis.protocol.GroupInfoRequest; +import org.apache.ratis.protocol.LeaderElectionManagementRequest; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; +import org.apache.ratis.protocol.RaftClientProtocol; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.SnapshotManagementRequest; +import org.apache.ratis.protocol.TransferLeadershipRequest; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.exceptions.RaftException; +import org.apache.ratis.protocol.exceptions.ReadException; +import org.apache.ratis.protocol.exceptions.ReadIndexException; import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException; import org.apache.ratis.protocol.exceptions.ResourceUnavailableException; import org.apache.ratis.protocol.exceptions.ServerNotReadyException; +import org.apache.ratis.protocol.exceptions.SetConfigurationException; import org.apache.ratis.protocol.exceptions.StaleReadException; import org.apache.ratis.protocol.exceptions.StateMachineException; import org.apache.ratis.protocol.exceptions.TransferLeadershipException; @@ -65,26 +126,19 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.ratis.util.*; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.CollectionUtils; +import org.apache.ratis.util.ConcurrentUtils; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.JmxRegister; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.Timestamp; import org.apache.ratis.util.function.CheckedSupplier; - -import javax.management.ObjectName; -import java.io.File; -import java.io.IOException; -import java.nio.file.NoSuchFileException; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER; import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS; @@ -166,8 +220,6 @@ public long[] getFollowerNextIndices() { private final DataStreamMap dataStreamMap; private final RaftServerConfigKeys.Read.Option readOption; - private final MemoizedSupplier raftClient; - private final RetryCacheImpl retryCache; private final CommitInfoCache commitInfoCache = new CommitInfoCache(); @@ -220,11 +272,6 @@ public long[] getFollowerNextIndices() { this.startComplete = new AtomicBoolean(false); this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString()); - this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder() - .setRaftGroup(group) - .setProperties(getRaftServer().getProperties()) - .build()); - this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); @@ -302,11 +349,6 @@ public DataStreamMap getDataStreamMap() { return dataStreamMap; } - @Override - public RaftClient getRaftClient() { - return raftClient.get(); - } - @Override public RetryCacheImpl getRetryCache() { return retryCache; @@ -496,14 +538,6 @@ public void close() { } catch (Exception ignored) { LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored); } - try { - if (raftClient.isInitialized()) { - raftClient.get().close(); - } - } catch (Exception ignored) { - LOG.warn("{}: Failed to close raft client", getMemberId(), ignored); - } - try { ConcurrentUtils.shutdownAndWait(clientExecutor); } catch (Exception ignored) { diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java index c18f7dea6f..86b03a2dae 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java @@ -139,10 +139,6 @@ Long runTestDataStream( .orElseThrow(IllegalStateException::new); } - ClientId getPrimaryClientId(CLUSTER cluster, RaftPeer primary) { - return cluster.getDivision(primary.getId()).getRaftClient().getId(); - } - long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) { final Iterable servers = CollectionUtils.as(cluster.getServers(), s -> s); final RaftPeerId leader; diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java index 324acfa7c8..64fa59e40e 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java @@ -18,15 +18,16 @@ package org.apache.ratis.datastream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.apache.ratis.RaftConfigKeys; -import org.apache.ratis.client.AsyncRpcApi; -import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine; import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -40,15 +41,6 @@ import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,10 +62,9 @@ public void setup() { RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY); } - RaftServer.Division mockDivision(RaftServer server, RaftClient client) { + RaftServer.Division mockDivision(RaftServer server) { final RaftServer.Division division = mock(RaftServer.Division.class); when(division.getRaftServer()).thenReturn(server); - when(division.getRaftClient()).thenReturn(client); when(division.getRaftConf()).thenAnswer(i -> getRaftConf()); final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine(); @@ -107,27 +98,8 @@ private void testMockCluster(int numServers, RaftException leaderException, when(raftServer.getId()).thenReturn(peerId); when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build()); if (getStateMachineException == null) { - RaftClient client = Mockito.mock(RaftClient.class); - when(client.getId()).thenReturn(clientId); - AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class); - when(client.async()).thenReturn(asyncRpcApi); - - final RaftServer.Division myDivision = mockDivision(raftServer, client); + final RaftServer.Division myDivision = mockDivision(raftServer); when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision); - - if (submitException != null) { - when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))).thenThrow(submitException); - } else if (i == 0) { - // primary - when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))) - .thenAnswer((Answer>) invocation -> { - final RaftClientRequest r = (RaftClientRequest) invocation.getArguments()[0]; - final RaftClientReply.Builder b = RaftClientReply.newBuilder().setRequest(r); - final RaftClientReply reply = leaderException != null? b.setException(leaderException).build() - : b.setSuccess().setMessage(() -> DataStreamTestUtils.MOCK).build(); - return CompletableFuture.completedFuture(reply); - }); - } } else { when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException); }