Skip to content

Commit

Permalink
Remove usage of Listener/Listeners interfaces in Raft protocol implem…
Browse files Browse the repository at this point in the history
…entation.
  • Loading branch information
kuujo committed Jun 14, 2017
1 parent 3175e3d commit 4846033
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 128 deletions.
Expand Up @@ -17,7 +17,6 @@

import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.server.RaftServer;
import io.atomix.util.temp.Listener;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -96,7 +95,7 @@ public interface RaftCluster {
long term();

/**
* Registers a callback to be called when a leader is elected.
* Adds a listener to be called when a leader is elected.
* <p>
* The provided {@code callback} will be called when a new leader is elected for a term. Because Raft ensures only a single leader
* can be elected for any given term, each election callback will be called at most once per term. However, note that a leader may
Expand All @@ -112,10 +111,16 @@ public interface RaftCluster {
* a member of the {@link RaftCluster}. When a leader election callback is called, the correct {@link #term()} for the leader is guaranteed
* to have already been set. Thus, to get the term for the provided leader, simply read the cluster {@link #term()}.
*
* @param callback The callback to be called when a new leader is elected.
* @return The leader election listener.
* @param listener The listener to be called when a new leader is elected.
*/
Listener<RaftMember> onLeaderElection(Consumer<RaftMember> callback);
void addLeaderElectionListener(Consumer<RaftMember> listener);

/**
* Removes a leader election listener from the cluster.
*
* @param listener The leader election listener to remove.
*/
void removeLeaderElectionListener(Consumer<RaftMember> listener);

/**
* Returns the local cluster member.
Expand Down Expand Up @@ -296,23 +301,17 @@ default CompletableFuture<Void> join(NodeId... cluster) {
* changes are sequentially consistent, meaning each server in the cluster will see members join in the same
* order, but different servers may see members join at different points in time. Users should not in any case
* assume that because one server has seen a member join the cluster all servers have.
* <p>
* The returned {@link Listener} can be used to stop listening for servers joining the cluster.
* <p>
* <pre>
* {@code
* // Start listening for members joining the cluster.
* Listener<Member> listener = server.cluster().onJoin(member -> System.out.println(member.address() + " joined!"));
*
* // Stop listening for members joining the cluster.
* listener.close();
* }
* </pre>
* @param listener The listener to be called when a member joins the cluster.
*/
void addJoinListener(Consumer<RaftMember> listener);

/**
* Removes a join listener from the cluster.
*
* @param callback The callback to be called when a member joins the cluster.
* @return The join listener.
* @param listener The listener to remove from the cluster.
*/
Listener<RaftMember> onJoin(Consumer<RaftMember> callback);
void removeJoinListener(Consumer<RaftMember> listener);

/**
* Registers a callback to be called when a member leaves the cluster.
Expand All @@ -321,22 +320,16 @@ default CompletableFuture<Void> join(NodeId... cluster) {
* changes are sequentially consistent, meaning each server in the cluster will see members leave in the same
* order, but different servers may see members leave at different points in time. Users should not in any case
* assume that because one server has seen a member leave the cluster all servers have.
* <p>
* The returned {@link Listener} can be used to stop listening for servers leaving the cluster.
* <p>
* <pre>
* {@code
* // Start listening for members leaving the cluster.
* Listener<Member> listener = server.cluster().onLeave(member -> System.out.println(member.address() + " left!"));
*
* // Stop listening for members leaving the cluster.
* listener.close();
* }
* </pre>
* @param listener The listener to be called when a member leaves the cluster.
*/
void addLeaveListener(Consumer<RaftMember> listener);

/**
* Removes a leave listener from the cluster.
*
* @param callback The callback to be called when a member leaves the cluster.
* @return The leave listener.
* @param listener The listener to remove from the cluster.
*/
Listener<RaftMember> onLeave(Consumer<RaftMember> callback);
void removeLeaveListener(Consumer<RaftMember> listener);

}
33 changes: 22 additions & 11 deletions core/src/main/java/io/atomix/protocols/raft/cluster/RaftMember.java
Expand Up @@ -16,7 +16,6 @@
package io.atomix.protocols.raft.cluster;

import io.atomix.cluster.NodeId;
import io.atomix.util.temp.Listener;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,8 +37,8 @@
* communicate with the member (which may be a different {@link NodeId}).
* <p>
* Users can listen for {@link RaftMember.Type} and {@link RaftMember.Status} changes via the
* {@link #onTypeChange(Consumer)} and {@link #onStatusChange(Consumer)} methods respectively. Member types
* can be modified by virtually any member of the cluster via the {@link #promote()} and {@link #demote()}
* {@link #addTypeChangeListener(Consumer)} and {@link #addStatusChangeListener(Consumer)} methods respectively.
* Member types can be modified by virtually any member of the cluster via the {@link #promote()} and {@link #demote()}
* methods. This allows servers to modify the way dead nodes interact with the cluster and modify the
* Raft quorum size without requiring the member being modified to be available. The member status is
* controlled only by the cluster {@link RaftCluster#leader() leader}. When the leader fails to contact a
Expand Down Expand Up @@ -165,16 +164,22 @@ enum Status {
Type type();

/**
* Registers a callback to be called when the member's type changes.
* Adds a listener to be called when the member's type changes.
* <p>
* The type change callback will be called when the local server receives notification of the change in type
* to this member. Type changes may occur at different times from the perspective of different servers but are
* guaranteed to occur in the same order on all servers.
*
* @param callback The callback to be called when the member's type changes.
* @return The member type change listener.
* @param listener The listener to be called when the member's type changes.
*/
Listener<Type> onTypeChange(Consumer<Type> callback);
void addTypeChangeListener(Consumer<Type> listener);

/**
* Removes a type change listener from the member.
*
* @param listener The listener to remove from the member.
*/
void removeTypeChangeListener(Consumer<Type> listener);

/**
* Returns the member status.
Expand All @@ -200,16 +205,22 @@ enum Status {
Instant updated();

/**
* Registers a callback to be called when the member's status changes.
* Adds a listener to be called when the member's status changes.
* <p>
* The status change callback will be called when the local server receives notification of the change in status
* to this member. Status changes may occur at different times from the perspective of different servers but are
* guaranteed to occur in the same order on all servers.
*
* @param callback The callback to be called when the member's status changes.
* @return The member status change listener.
* @param listener The listener to be called when the member's status changes.
*/
void addStatusChangeListener(Consumer<Status> listener);

/**
* Removes a status change listener.
*
* @param listener The status change listener to remove.
*/
Listener<Status> onStatusChange(Consumer<Status> callback);
void removeStatusChangeListener(Consumer<Status> listener);

/**
* Promotes the member to the next highest type.
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/java/io/atomix/protocols/raft/server/RaftServer.java
Expand Up @@ -16,17 +16,16 @@
package io.atomix.protocols.raft.server;

import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.error.ConfigurationException;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.cluster.RaftCluster;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.error.ConfigurationException;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.server.state.ServerContext;
import io.atomix.protocols.raft.server.state.StateMachineRegistry;
import io.atomix.protocols.raft.server.storage.Storage;
import io.atomix.util.Assert;
import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.CatalystThreadFactory;
import io.atomix.util.temp.Listener;
import io.atomix.util.temp.SingleThreadContext;
import io.atomix.util.temp.ThreadContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -257,7 +256,7 @@ public enum State {
protected final ServerContext context;
private volatile CompletableFuture<RaftServer> openFuture;
private volatile CompletableFuture<Void> closeFuture;
private Listener<RaftMember> electionListener;
private Consumer<RaftMember> electionListener;
private volatile boolean started;

protected RaftServer(String name, RaftServerProtocol protocol, ServerContext context) {
Expand Down Expand Up @@ -343,12 +342,10 @@ public State state() {
* </pre>
*
* @param listener The state change listener.
* @return The listener context. This can be used to unregister the election listener via
* {@link Listener#close()}.
* @throws NullPointerException If {@code listener} is {@code null}
*/
public Listener<State> onStateChange(Consumer<State> listener) {
return context.onStateChange(listener);
public void addStateChangeListener(Consumer<State> listener) {
context.addStateChangeListener(listener);
}

/**
Expand Down Expand Up @@ -532,14 +529,14 @@ private CompletableFuture<RaftServer> start(Supplier<CompletableFuture<Void>> jo
started = true;
future.complete(this);
} else {
electionListener = cluster().onLeaderElection(leader -> {
electionListener = leader -> {
if (electionListener != null) {
started = true;
future.complete(this);
electionListener.close();
cluster().removeLeaderElectionListener(electionListener);
electionListener = null;
}
});
};
}
} else {
future.completeExceptionally(error);
Expand Down
Expand Up @@ -15,8 +15,6 @@
*/
package io.atomix.protocols.raft.server.session;

import io.atomix.util.temp.Listener;

import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -65,7 +63,19 @@ public interface ServerSession {
*/
State state();

Listener<State> onStateChange(Consumer<State> callback);
/**
* Adds a state change listener to the session.
*
* @param listener the state change listener to add
*/
void addStateChangeListener(Consumer<State> listener);

/**
* Removes a state change listener from the session.
*
* @param listener the state change listener to remove
*/
void removeStateChangeListener(Consumer<State> listener);

/**
* Publishes a {@code null} named event to the session.
Expand Down
Expand Up @@ -29,6 +29,8 @@
import io.atomix.protocols.raft.server.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.server.storage.snapshot.SnapshotReader;
import io.atomix.util.Assert;
import io.atomix.util.serializer.KryoNamespaces;
import io.atomix.util.serializer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -453,7 +455,7 @@ protected InstallRequest buildInstallRequest(MemberState member) {
InstallRequest request;
synchronized (snapshot) {
// Open a new snapshot reader.
try (SnapshotReader reader = snapshot.reader()) {
try (SnapshotReader reader = snapshot.reader(Serializer.using(KryoNamespaces.RAFT))) {
// Skip to the next batch of bytes according to the snapshot chunk size and current offset.
reader.skip(member.getNextSnapshotOffset() * MAX_BATCH_SIZE);
byte[] data = new byte[Math.min(MAX_BATCH_SIZE, (int) reader.remaining())];
Expand Down
Expand Up @@ -34,6 +34,8 @@
import io.atomix.protocols.raft.server.storage.entry.QueryEntry;
import io.atomix.protocols.raft.server.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.server.storage.snapshot.SnapshotWriter;
import io.atomix.util.serializer.KryoNamespaces;
import io.atomix.util.serializer.Serializer;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -351,7 +353,7 @@ public CompletableFuture<InstallResponse> install(InstallRequest request) {
}

// Write the data to the snapshot.
try (SnapshotWriter writer = pendingSnapshot.writer()) {
try (SnapshotWriter writer = pendingSnapshot.writer(Serializer.using(KryoNamespaces.RAFT))) {
writer.write(request.data());
}

Expand Down
Expand Up @@ -16,19 +16,17 @@
package io.atomix.protocols.raft.server.state;

import io.atomix.cluster.NodeId;
import io.atomix.protocols.raft.cluster.RaftCluster;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.error.RaftError;
import io.atomix.protocols.raft.protocol.JoinRequest;
import io.atomix.protocols.raft.protocol.LeaveRequest;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.server.RaftServer;
import io.atomix.protocols.raft.cluster.RaftCluster;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.server.storage.system.Configuration;
import io.atomix.util.Assert;
import io.atomix.util.concurrent.Futures;
import io.atomix.util.temp.CatalystThreadFactory;
import io.atomix.util.temp.Listener;
import io.atomix.util.temp.Listeners;
import io.atomix.util.temp.Scheduled;
import io.atomix.util.temp.SingleThreadContext;
import org.slf4j.Logger;
Expand Down Expand Up @@ -71,8 +69,8 @@ final class RaftClusterState implements RaftCluster, AutoCloseable {
private volatile CompletableFuture<Void> joinFuture;
private volatile Scheduled leaveTimeout;
private volatile CompletableFuture<Void> leaveFuture;
private final Listeners<RaftMember> joinListeners = new Listeners<>();
private final Listeners<RaftMember> leaveListeners = new Listeners<>();
private final Set<Consumer<RaftMember>> joinListeners = new CopyOnWriteArraySet<>();
private final Set<Consumer<RaftMember>> leaveListeners = new CopyOnWriteArraySet<>();

RaftClusterState(RaftMember.Type type, NodeId localNodeId, ServerContext context) {
Instant time = Instant.now();
Expand Down Expand Up @@ -138,8 +136,13 @@ public long term() {
}

@Override
public Listener<RaftMember> onLeaderElection(Consumer<RaftMember> callback) {
return context.onLeaderElection(callback);
public void addLeaderElectionListener(Consumer<RaftMember> callback) {
context.addLeaderElectionListener(callback);
}

@Override
public void removeLeaderElectionListener(Consumer<RaftMember> listener) {
context.removeLeaderElectionListener(listener);
}

@Override
Expand All @@ -162,13 +165,23 @@ public RaftMemberState member(NodeId id) {
}

@Override
public Listener<RaftMember> onJoin(Consumer<RaftMember> callback) {
return joinListeners.add(callback);
public void addJoinListener(Consumer<RaftMember> listener) {
joinListeners.add(listener);
}

@Override
public void removeJoinListener(Consumer<RaftMember> listener) {
joinListeners.remove(listener);
}

@Override
public void addLeaveListener(Consumer<RaftMember> listener) {
leaveListeners.add(listener);
}

@Override
public Listener<RaftMember> onLeave(Consumer<RaftMember> callback) {
return leaveListeners.add(callback);
public void removeLeaveListener(Consumer<RaftMember> listener) {
leaveListeners.remove(listener);
}

/**
Expand Down Expand Up @@ -585,7 +598,7 @@ RaftClusterState configure(Configuration configuration) {
this.members.add(state.getMember());
this.remoteMembers.add(state);
membersMap.put(member.id(), state);
joinListeners.accept(state.getMember());
joinListeners.forEach(l -> l.accept(member));
}

// If the member type has changed, update the member type and reset its state.
Expand Down Expand Up @@ -632,7 +645,7 @@ RaftClusterState configure(Configuration configuration) {
memberType.remove(member);
}
membersMap.remove(member.getMember().id());
leaveListeners.accept(member.getMember());
leaveListeners.forEach(l -> l.accept(member.getMember()));
} else {
i++;
}
Expand Down

0 comments on commit 4846033

Please sign in to comment.