Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-624. RaftServer should support pause/ unpause in its LifeCycle state #183

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
Expand Down Expand Up @@ -101,6 +103,8 @@ RaftClientReply groupRemove(RaftGroupId groupId, boolean deleteDirectory,
/** Send getGroupInfo request to the given server.*/
GroupInfoReply getGroupInfo(RaftGroupId group, RaftPeerId server) throws IOException;

PauseUnpauseReplyProto pause(PauseUnpauseRequestProto request, RaftPeerId server) throws IOException;

/** @return a {@link Builder}. */
static Builder newBuilder() {
return new Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.client.api.StreamApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
Expand Down Expand Up @@ -218,7 +220,7 @@ public RaftClientReply sendWatch(long index, ReplicationLevel replication) throw

private RaftClientReply send(RaftClientRequest.Type type, Message message, RaftPeerId server)
throws IOException {
if (!type.is(TypeCase.WATCH)) {
if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.PAUSEUNPAUSE)) {
Objects.requireNonNull(message, "message == null");
}

Expand Down Expand Up @@ -279,6 +281,17 @@ public GroupInfoReply getGroupInfo(RaftGroupId raftGroupId, RaftPeerId server) t
return (GroupInfoReply)reply;
}

@Override
public PauseUnpauseReplyProto pause(
PauseUnpauseRequestProto request, RaftPeerId server) throws IOException {
final RaftClientReply reply = sendPause(server);
return null;
}

private RaftClientReply sendPause(RaftPeerId server) throws IOException {
return send(RaftClientRequest.pauseUnpauseType(true), null, server);
}

private void addServers(Stream<RaftPeer> peersInNewConf) {
clientRpc.addServers(
peersInNewConf.filter(p -> !peers.contains(p))::iterator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public RaftClientReply(RaftClientRequest request, NotReplicatedException nre,
request.getCallId(), false, request.getMessage(), nre, nre.getLogIndex(), commitInfos);
}

public RaftClientReply(RaftClientRequest request) {
this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
request.getCallId(), true, null, null, 0L, null);
}

/**
* Get the commit information for the entire group.
* The commit information may be unavailable for exception reply.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public static Type streamRequestType(long streamId, long messageId, boolean endO
.build());
}

public static Type pauseUnpauseType(boolean pause) {
return new Type(
PauseUnpauseRequestProto.newBuilder().setPause(pause).build()
);
}

public static Type readRequestType() {
return DEFAULT_READ;
}
Expand Down Expand Up @@ -87,6 +93,10 @@ public static Type valueOf(StreamRequestTypeProto stream) {
return streamRequestType(stream.getStreamId(), stream.getMessageId(), stream.getEndOfRequest());
}

public static Type valueOf(PauseUnpauseRequestProto request) {
return pauseUnpauseType(request.getPause());
}

/**
* The type case of the proto.
* Only the corresponding proto (must be non-null) is used.
Expand Down Expand Up @@ -120,6 +130,10 @@ private Type(WatchRequestTypeProto watch) {
this(WATCH, watch);
}

private Type(PauseUnpauseRequestProto request) {
this(PAUSEUNPAUSE, request);
}

public boolean is(RaftClientRequestProto.TypeCase tCase) {
return getTypeCase().equals(tCase);
}
Expand Down Expand Up @@ -153,6 +167,11 @@ public WatchRequestTypeProto getWatch() {
return (WatchRequestTypeProto)proto;
}

public PauseUnpauseRequestProto getPauseUnpause() {
Preconditions.assertTrue(is(PAUSEUNPAUSE));
return (PauseUnpauseRequestProto)proto;
}

public static String toString(ReplicationLevel replication) {
return replication == ReplicationLevel.MAJORITY? "": "-" + replication;
}
Expand Down
17 changes: 17 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;

import org.apache.ratis.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.ratis.util.function.CheckedRunnable;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,6 +83,10 @@ public boolean isClosingOrClosed() {
return States.CLOSING_OR_CLOSED.contains(this);
}

public boolean isPausingOrPaused() {
return States.PAUSING_OR_PAUSED.contains(this);
}

static void put(State key, Map<State, List<State>> map, State... values) {
map.put(key, Collections.unmodifiableList(Arrays.asList(values)));
}
Expand Down Expand Up @@ -131,6 +136,9 @@ public static final class States {
public static final Set<State> CLOSING_OR_CLOSED_OR_EXCEPTION
= Collections.unmodifiableSet(EnumSet.of(State.CLOSING, State.CLOSED, State.EXCEPTION));

public static final Set<State> PAUSING_OR_PAUSED
= Collections.unmodifiableSet(EnumSet.of(State.PAUSING, State.PAUSED));

private States() {
// no instances
}
Expand Down Expand Up @@ -220,6 +228,15 @@ public final <T extends Throwable> void startAndTransition(
}
}

public State checkStateAndPause() {
assertCurrentState(ImmutableSet.of(State.RUNNING));
if (compareAndTransition(State.RUNNING, State.PAUSING)) {
return State.PAUSING;
}

return null;
}

/**
* Check the current state and, if applicable, transit to {@link State#CLOSING}.
* If this is already in {@link State#CLOSING} or {@link State#CLOSED},
Expand Down
9 changes: 9 additions & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ message ClientMessageEntryProto {
bytes content = 1;
}

message PauseUnpauseRequestProto {
bool pause = 1;
}

message PauseUnpauseReplyProto {
bool success = 15;
}

enum ReplicationLevel {
/** Committed at the leader and replicated to the majority of peers. */
MAJORITY = 0;
Expand Down Expand Up @@ -275,6 +283,7 @@ message RaftClientRequestProto {
StaleReadRequestTypeProto staleRead = 5;
WatchRequestTypeProto watch = 6;
StreamRequestTypeProto stream = 7;
PauseUnpauseRequestProto pauseUnpause = 8;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerPauseUnpauseProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LifeCycle;
Expand All @@ -36,7 +39,7 @@
public interface RaftServer extends Closeable, RpcType.Get,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol,
AdminProtocol, AdminAsynchronousProtocol {
AdminProtocol, AdminAsynchronousProtocol, RaftServerPauseUnpauseProtocol {

/** @return the server ID. */
RaftPeerId getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
Expand Down Expand Up @@ -356,6 +358,13 @@ public RaftClientReply groupManagement(GroupManagementRequest request) throws IO
e -> new RaftClientReply(request, e, null));
}

@Override
public RaftClientReply requestPauseUnpause(PauseUnpauseRequestProto request) throws IOException {
if (lifeCycle.checkStateAndPause() != null) {
return new RaftClientReply();
}
}

@Override
public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) {
final RaftGroupId groupId = request.getRaftGroupId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.ratis.server.protocol;

import java.io.IOException;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.protocol.RaftClientReply;

public interface RaftServerPauseUnpauseProtocol {

RaftClientReply requestPauseUnpause(PauseUnpauseRequestProto request) throws IOException;

}
34 changes: 34 additions & 0 deletions ratis-server/src/test/java/org/apache/ratis/PauseUnpauseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.apache.ratis;

import java.io.IOException;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseReplyProto;
import org.apache.ratis.proto.RaftProtos.PauseUnpauseRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.junit.Test;

public abstract class PauseUnpauseTest<CLUSTER extends MiniRaftCluster> extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {

public static final int NUM_SERVERS = 3;

@Test
public void testPause() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runPauseTest);
}

void runPauseTest(CLUSTER cluster) throws InterruptedException {
RaftTestUtil.waitForLeader(cluster);
RaftPeerId server = cluster.getFollowers().get(0).getId();

try (RaftClient raftclient = cluster.createClient()) {
PauseUnpauseReplyProto replyProto=
raftclient.pause(
PauseUnpauseRequestProto.newBuilder().setPause(true).build(),
server);
System.out.println("---------: " + replyProto.getSuccess());
} catch (IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.GroupInfoRequest;
Expand Down Expand Up @@ -177,6 +178,9 @@ public RaftClientReply handleRequest(RaftClientRequest request)
server.getGroupInfo((GroupInfoRequest) request));
} else if (request instanceof SetConfigurationRequest) {
future = server.setConfigurationAsync((SetConfigurationRequest) request);
} else if (request.getType().is(TypeCase.PAUSEUNPAUSE)) {
future = CompletableFuture.completedFuture(
server.requestPauseUnpause(request.getType().getPauseUnpause()));
} else {
future = server.submitClientRequestAsync(request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.ratis.server.simulation;

import org.apache.ratis.PauseUnpauseTest;

public class TestPauseUnpauseWithSimulatedRpc
extends PauseUnpauseTest<MiniRaftClusterWithSimulatedRpc>
implements MiniRaftClusterWithSimulatedRpc.FactoryGet {

}