Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@
*/
package org.apache.ratis.server;

import org.apache.ratis.client.RaftClient;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
import org.apache.ratis.protocol.AdminProtocol;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
Expand All @@ -37,14 +51,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;

/** Raft server interface */
public interface RaftServer extends Closeable, RpcType.Get,
RaftServerProtocol, RaftServerAsynchronousProtocol,
Expand Down Expand Up @@ -111,9 +117,6 @@ default RaftGroup getGroup() {
/** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();

/** @return the internal {@link RaftClient} of this division. */
RaftClient getRaftClient();

/** @return the {@link ThreadGroup} the threads of this Division belong to. */
ThreadGroup getThreadGroup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,85 @@
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.client.RaftClient;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.ObjectName;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.LeaderElectionManagementRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
Expand Down Expand Up @@ -65,26 +126,19 @@
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.*;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.ConcurrentUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.JmxRegister;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedSupplier;

import javax.management.ObjectName;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
Expand Down Expand Up @@ -166,8 +220,6 @@ public long[] getFollowerNextIndices() {
private final DataStreamMap dataStreamMap;
private final RaftServerConfigKeys.Read.Option readOption;

private final MemoizedSupplier<RaftClient> raftClient;

private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();

Expand Down Expand Up @@ -220,11 +272,6 @@ public long[] getFollowerNextIndices() {
this.startComplete = new AtomicBoolean(false);
this.threadGroup = new ThreadGroup(proxy.getThreadGroup(), getMemberId().toString());

this.raftClient = JavaUtils.memoize(() -> RaftClient.newBuilder()
.setRaftGroup(group)
.setProperties(getRaftServer().getProperties())
.build());

this.transferLeadership = new TransferLeadership(this, properties);
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
Expand Down Expand Up @@ -302,11 +349,6 @@ public DataStreamMap getDataStreamMap() {
return dataStreamMap;
}

@Override
public RaftClient getRaftClient() {
return raftClient.get();
}

@Override
public RetryCacheImpl getRetryCache() {
return retryCache;
Expand Down Expand Up @@ -496,14 +538,6 @@ public void close() {
} catch (Exception ignored) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
}
try {
if (raftClient.isInitialized()) {
raftClient.get().close();
}
} catch (Exception ignored) {
LOG.warn("{}: Failed to close raft client", getMemberId(), ignored);
}

try {
ConcurrentUtils.shutdownAndWait(clientExecutor);
} catch (Exception ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,6 @@ Long runTestDataStream(
.orElseThrow(IllegalStateException::new);
}

ClientId getPrimaryClientId(CLUSTER cluster, RaftPeer primary) {
return cluster.getDivision(primary.getId()).getRaftClient().getId();
}

long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
final RaftPeerId leader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

package org.apache.ratis.datastream;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand All @@ -40,15 +41,6 @@
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -70,10 +62,9 @@ public void setup() {
RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
}

RaftServer.Division mockDivision(RaftServer server, RaftClient client) {
RaftServer.Division mockDivision(RaftServer server) {
final RaftServer.Division division = mock(RaftServer.Division.class);
when(division.getRaftServer()).thenReturn(server);
when(division.getRaftClient()).thenReturn(client);
when(division.getRaftConf()).thenAnswer(i -> getRaftConf());

final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
Expand Down Expand Up @@ -107,27 +98,8 @@ private void testMockCluster(int numServers, RaftException leaderException,
when(raftServer.getId()).thenReturn(peerId);
when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
if (getStateMachineException == null) {
RaftClient client = Mockito.mock(RaftClient.class);
when(client.getId()).thenReturn(clientId);
AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class);
when(client.async()).thenReturn(asyncRpcApi);

final RaftServer.Division myDivision = mockDivision(raftServer, client);
final RaftServer.Division myDivision = mockDivision(raftServer);
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);

if (submitException != null) {
when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))).thenThrow(submitException);
} else if (i == 0) {
// primary
when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class)))
.thenAnswer((Answer<CompletableFuture<RaftClientReply>>) invocation -> {
final RaftClientRequest r = (RaftClientRequest) invocation.getArguments()[0];
final RaftClientReply.Builder b = RaftClientReply.newBuilder().setRequest(r);
final RaftClientReply reply = leaderException != null? b.setException(leaderException).build()
: b.setSuccess().setMessage(() -> DataStreamTestUtils.MOCK).build();
return CompletableFuture.completedFuture(reply);
});
}
} else {
when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenThrow(getStateMachineException);
}
Expand Down