Skip to content

Commit

Permalink
Refactor Raft implementation package structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 15, 2017
1 parent 2db9469 commit f3741bf
Show file tree
Hide file tree
Showing 54 changed files with 781 additions and 756 deletions.
Expand Up @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License * limitations under the License
*/ */
package io.atomix.protocols.raft.client; package io.atomix.protocols.raft;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.RaftQuery;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License * limitations under the License
*/ */
package io.atomix.protocols.raft.client; package io.atomix.protocols.raft;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;


Expand Down
Expand Up @@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License * limitations under the License
*/ */
package io.atomix.protocols.raft.client; package io.atomix.protocols.raft;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.client.impl.DefaultRaftClient; import io.atomix.protocols.raft.impl.DefaultRaftClient;
import io.atomix.protocols.raft.protocol.RaftClientProtocol; import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.proxy.RaftProxy; import io.atomix.protocols.raft.proxy.RaftProxy;


Expand Down
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.protocols.raft.RaftOperation;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;


import java.time.Instant; import java.time.Instant;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.atomix.protocols.raft.client; package io.atomix.protocols.raft;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.metadata.RaftSessionMetadata; import io.atomix.protocols.raft.metadata.RaftSessionMetadata;
Expand Down
Expand Up @@ -13,15 +13,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.cluster.RaftCluster; import io.atomix.protocols.raft.cluster.RaftCluster;
import io.atomix.protocols.raft.cluster.RaftMember; import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.error.ConfigurationException; import io.atomix.protocols.raft.error.ConfigurationException;
import io.atomix.protocols.raft.protocol.RaftServerProtocol; import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.server.state.ServerContext; import io.atomix.protocols.raft.impl.RaftServerContext;
import io.atomix.protocols.raft.server.state.StateMachineRegistry; import io.atomix.protocols.raft.roles.StateMachineRegistry;
import io.atomix.protocols.raft.storage.Storage; import io.atomix.protocols.raft.storage.Storage;
import io.atomix.util.concurrent.Futures; import io.atomix.util.concurrent.Futures;
import io.atomix.util.concurrent.AtomixThreadFactory; import io.atomix.util.concurrent.AtomixThreadFactory;
Expand Down Expand Up @@ -200,7 +200,7 @@ public static Builder builder(NodeId localNodeId) {
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public enum State { public enum Role {


/** /**
* Represents the state of an inactive server. * Represents the state of an inactive server.
Expand Down Expand Up @@ -253,13 +253,13 @@ public enum State {


protected final String name; protected final String name;
protected final RaftServerProtocol protocol; protected final RaftServerProtocol protocol;
protected final ServerContext context; protected final RaftServerContext context;
private volatile CompletableFuture<RaftServer> openFuture; private volatile CompletableFuture<RaftServer> openFuture;
private volatile CompletableFuture<Void> closeFuture; private volatile CompletableFuture<Void> closeFuture;
private Consumer<RaftMember> electionListener; private Consumer<RaftMember> electionListener;
private volatile boolean started; private volatile boolean started;


protected RaftServer(String name, RaftServerProtocol protocol, ServerContext context) { protected RaftServer(String name, RaftServerProtocol protocol, RaftServerContext context) {
this.name = checkNotNull(name, "name cannot be null"); this.name = checkNotNull(name, "name cannot be null");
this.protocol = checkNotNull(protocol, "protocol cannot be null"); this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.context = checkNotNull(context, "context cannot be null"); this.context = checkNotNull(context, "context cannot be null");
Expand Down Expand Up @@ -315,22 +315,22 @@ public RaftCluster cluster() {
/** /**
* Returns the Copycat server state. * Returns the Copycat server state.
* <p> * <p>
* The initial state of a Raft server is {@link State#INACTIVE}. Once the server is {@link #bootstrap() started} and * The initial state of a Raft server is {@link Role#INACTIVE}. Once the server is {@link #bootstrap() started} and
* until it is explicitly shutdown, the server will be in one of the active states - {@link State#PASSIVE}, * until it is explicitly shutdown, the server will be in one of the active states - {@link Role#PASSIVE},
* {@link State#FOLLOWER}, {@link State#CANDIDATE}, or {@link State#LEADER}. * {@link Role#FOLLOWER}, {@link Role#CANDIDATE}, or {@link Role#LEADER}.
* *
* @return The Copycat server state. * @return The Copycat server state.
*/ */
public State state() { public Role state() {
return context.getState(); return context.getState();
} }


/** /**
* Registers a state change listener. * Registers a state change listener.
* <p> * <p>
* Throughout the lifetime of the cluster, the server will periodically transition between various {@link RaftServer.State states}. * Throughout the lifetime of the cluster, the server will periodically transition between various {@link Role states}.
* Users can listen for and react to state change events. To determine when this server is elected leader, simply * Users can listen for and react to state change events. To determine when this server is elected leader, simply
* listen for the {@link RaftServer.State#LEADER} state. * listen for the {@link Role#LEADER} state.
* <pre> * <pre>
* {@code * {@code
* server.onStateChange(state -> { * server.onStateChange(state -> {
Expand All @@ -344,7 +344,7 @@ public State state() {
* @param listener The state change listener. * @param listener The state change listener.
* @throws NullPointerException If {@code listener} is {@code null} * @throws NullPointerException If {@code listener} is {@code null}
*/ */
public void addStateChangeListener(Consumer<State> listener) { public void addStateChangeListener(Consumer<Role> listener) {
context.addStateChangeListener(listener); context.addStateChangeListener(listener);
} }


Expand Down Expand Up @@ -578,7 +578,7 @@ public CompletableFuture<Void> shutdown() {
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
context.getThreadContext().execute(() -> { context.getThreadContext().execute(() -> {
started = false; started = false;
context.transition(RaftServer.State.INACTIVE); context.transition(Role.INACTIVE);
future.complete(null); future.complete(null);
}); });


Expand Down Expand Up @@ -818,7 +818,7 @@ public RaftServer build() {
ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", localNodeId, name)); ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", localNodeId, name));
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(threadPoolSize, new AtomixThreadFactory("copycat-" + name + "-state-%d")); ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(threadPoolSize, new AtomixThreadFactory("copycat-" + name + "-state-%d"));


ServerContext context = new ServerContext(name, type, localNodeId, protocol, storage, stateMachineRegistry, threadPool, threadContext); RaftServerContext context = new RaftServerContext(name, type, localNodeId, protocol, storage, stateMachineRegistry, threadPool, threadContext);
context.setElectionTimeout(electionTimeout) context.setElectionTimeout(electionTimeout)
.setHeartbeatInterval(heartbeatInterval) .setHeartbeatInterval(heartbeatInterval)
.setSessionTimeout(sessionTimeout); .setSessionTimeout(sessionTimeout);
Expand Down
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.protocols.raft.RaftOperation;
import io.atomix.protocols.raft.error.CommandException; import io.atomix.protocols.raft.error.CommandException;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.session.RaftSessionListener; import io.atomix.protocols.raft.session.RaftSessionListener;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License * limitations under the License
*/ */
package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.protocols.raft.storage.snapshot.SnapshotReader; import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter; import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */


package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.protocols.raft.session.RaftSessions; import io.atomix.protocols.raft.session.RaftSessions;


Expand Down
Expand Up @@ -14,9 +14,8 @@
* limitations under the License. * limitations under the License.
*/ */


package io.atomix.protocols.raft.server; package io.atomix.protocols.raft;


import io.atomix.protocols.raft.RaftOperation;
import io.atomix.util.concurrent.ThreadContext; import io.atomix.util.concurrent.ThreadContext;


import java.util.function.Consumer; import java.util.function.Consumer;
Expand Down

This file was deleted.

Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.cluster; package io.atomix.protocols.raft.cluster;


import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.server.RaftServer; import io.atomix.protocols.raft.RaftServer;


import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License * limitations under the License
*/ */
package io.atomix.protocols.raft.server.state; package io.atomix.protocols.raft.cluster.impl;


import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import io.atomix.cluster.NodeId; import io.atomix.cluster.NodeId;
Expand All @@ -39,18 +39,18 @@
* *
* @author <a href="http://github.com/kuujo>Jordan Halterman</a> * @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/ */
public final class RaftMemberState implements RaftMember, AutoCloseable { public final class DefaultRaftMember implements RaftMember, AutoCloseable {
private final NodeId id; private final NodeId id;
private final transient int hash; private final transient int hash;
private RaftMember.Type type; private RaftMember.Type type;
private Status status = Status.AVAILABLE; private Status status = Status.AVAILABLE;
private Instant updated; private Instant updated;
private transient Scheduled configureTimeout; private transient Scheduled configureTimeout;
private transient RaftClusterState cluster; private transient RaftClusterContext cluster;
private transient Set<Consumer<Type>> typeChangeListeners = new CopyOnWriteArraySet<>(); private transient Set<Consumer<Type>> typeChangeListeners = new CopyOnWriteArraySet<>();
private transient Set<Consumer<Status>> statusChangeListeners = new CopyOnWriteArraySet<>(); private transient Set<Consumer<Status>> statusChangeListeners = new CopyOnWriteArraySet<>();


public RaftMemberState(NodeId id, RaftMember.Type type, RaftMember.Status status, Instant updated) { public DefaultRaftMember(NodeId id, RaftMember.Type type, RaftMember.Status status, Instant updated) {
this.id = checkNotNull(id, "id cannot be null"); this.id = checkNotNull(id, "id cannot be null");
this.hash = Hashing.murmur3_32() this.hash = Hashing.murmur3_32()
.hashUnencodedChars(id.id()) .hashUnencodedChars(id.id())
Expand All @@ -63,7 +63,7 @@ public RaftMemberState(NodeId id, RaftMember.Type type, RaftMember.Status status
/** /**
* Sets the member's parent cluster. * Sets the member's parent cluster.
*/ */
RaftMemberState setCluster(RaftClusterState cluster) { DefaultRaftMember setCluster(RaftClusterContext cluster) {
this.cluster = cluster; this.cluster = cluster;
return this; return this;
} }
Expand Down Expand Up @@ -144,7 +144,7 @@ public CompletableFuture<Void> remove() {
* @param type The member type. * @param type The member type.
* @return The member. * @return The member.
*/ */
RaftMemberState update(RaftMember.Type type, Instant time) { public DefaultRaftMember update(RaftMember.Type type, Instant time) {
if (this.type != type) { if (this.type != type) {
this.type = checkNotNull(type, "type cannot be null"); this.type = checkNotNull(type, "type cannot be null");
if (time.isAfter(updated)) { if (time.isAfter(updated)) {
Expand All @@ -163,7 +163,7 @@ RaftMemberState update(RaftMember.Type type, Instant time) {
* @param status The member status. * @param status The member status.
* @return The member. * @return The member.
*/ */
RaftMemberState update(Status status, Instant time) { public DefaultRaftMember update(Status status, Instant time) {
if (this.status != status) { if (this.status != status) {
this.status = checkNotNull(status, "status cannot be null"); this.status = checkNotNull(status, "status cannot be null");
if (time.isAfter(updated)) { if (time.isAfter(updated)) {
Expand Down Expand Up @@ -200,7 +200,7 @@ private void configure(RaftMember.Type type, CompletableFuture<Void> future) {
cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder()
.withIndex(cluster.getConfiguration().index()) .withIndex(cluster.getConfiguration().index())
.withTerm(cluster.getConfiguration().term()) .withTerm(cluster.getConfiguration().term())
.withMember(new RaftMemberState(id, type, status, updated)) .withMember(new DefaultRaftMember(id, type, status, updated))
.build()).whenComplete((response, error) -> { .build()).whenComplete((response, error) -> {
if (error == null) { if (error == null) {
if (response.status() == RaftResponse.Status.OK) { if (response.status() == RaftResponse.Status.OK) {
Expand Down Expand Up @@ -240,7 +240,7 @@ public int hashCode() {


@Override @Override
public boolean equals(Object object) { public boolean equals(Object object) {
return object instanceof RaftMemberState && ((RaftMemberState) object).id.equals(id); return object instanceof DefaultRaftMember && ((DefaultRaftMember) object).id.equals(id);
} }


@Override @Override
Expand Down

0 comments on commit f3741bf

Please sign in to comment.